diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java index 2ad9482421..d303ba9b79 100644 --- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java +++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java @@ -52,12 +52,22 @@ public interface ExecutionContext extends Executor { */ Task submit(Map properties, Callable callable); - /** {@link ExecutionManager#submit(Runnable) */ + /** {@link ExecutionManager#submit(Runnable) + * @deprecated since 0.13.0 pass a display name or a more detailed map */ + @Deprecated Task submit(Runnable runnable); - /** {@link ExecutionManager#submit(Callable) */ + /** {@link ExecutionManager#submit(Callable) + * @deprecated since 0.13.0 pass a display name or a more detailed map */ + @Deprecated Task submit(Callable callable); + /** {@link ExecutionManager#submit(String, Runnable) */ + Task submit(String displayName, Runnable runnable); + + /** {@link ExecutionManager#submit(String, Callable) */ + Task submit(String displayName, Callable callable); + /** See {@link ExecutionManager#submit(Map, TaskAdaptable)}. */ Task submit(TaskAdaptable task); @@ -86,18 +96,23 @@ public interface ExecutionContext extends Executor { // TODO reference ImmediateSupplier when that class is moved to utils project @Beta Maybe getImmediately(Object callableOrSupplierOrTask); + /** As {@link #getImmediately(Object)} but strongly typed for a task. */ + @Beta + Maybe getImmediately(Task callableOrSupplierOrTask); /** - * Efficient shortcut for {@link #submit(TaskAdaptable)} followed by an immediate {@link Task#get()}. + * Efficient implementation of common case when {@link #submit(TaskAdaptable)} is followed by an immediate {@link Task#get()}. *

- * Implementations will typically attempt to execute in the current thread, with appropriate - * configuration to make it look like it is in a sub-thread, - * ie registering this as a task and allowing - * context methods on tasks to see the given sub-task. + * This is efficient in that it may typically attempt to execute in the current thread, + * with appropriate configuration to make it look like it is in a sub-thread, + * ie registering this as a task and allowing context methods on tasks to see the given sub-task. + * However it will normally be non-blocking which reduces overhead and + * is permissible within a {@link #getImmediately(Object)} task *

- * If the argument has already been submitted it simply blocks on it. + * If the argument has already been submitted it simply blocks on it + * (i.e. no additional execution, and in that case would fail within a {@link #getImmediately(Object)}). * - * @param task + * @param task the task whose result is being sought * @return result of the task execution */ @Beta diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java index 97108aba36..347a127035 100644 --- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java +++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionManager.java @@ -79,11 +79,21 @@ public interface ExecutionManager { // /** returns all tasks known to this manager (immutable) */ // public Set> getAllTasks(); - /** see {@link #submit(Map, TaskAdaptable)} */ + /** see {@link #submit(Map, TaskAdaptable)} + * @deprecated since 0.13.0 pass displayName or map */ + @Deprecated public Task submit(Runnable r); + /** see {@link #submit(Map, TaskAdaptable)} + * @deprecated since 0.13.0 pass displayName or map */ + @Deprecated + public Task submit(Callable r); + + /** see {@link #submit(Map, TaskAdaptable)} */ + public Task submit(String displayName, Runnable c); + /** see {@link #submit(Map, TaskAdaptable)} */ - public Task submit(Callable c); + public Task submit(String displayName, Callable c); /** see {@link #submit(Map, TaskAdaptable)} */ public Task submit(TaskAdaptable task); diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java index 515ec6b169..c1a2c21cc8 100644 --- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java +++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ManagementContext.java @@ -34,6 +34,7 @@ import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityManager; import org.apache.brooklyn.api.mgmt.rebind.RebindManager; import org.apache.brooklyn.api.objs.BrooklynObject; +import org.apache.brooklyn.api.objs.EntityAdjunct; import org.apache.brooklyn.api.typereg.BrooklynTypeRegistry; import org.apache.brooklyn.config.StringConfigMap; import org.apache.brooklyn.util.guava.Maybe; @@ -165,6 +166,9 @@ public interface ManagementContext { */ ExecutionContext getExecutionContext(Entity entity); + /** As {@link #getExecutionContext(Entity)} where there is also an adjunct */ + ExecutionContext getExecutionContext(Entity e, EntityAdjunct a); + /** * Returns a {@link SubscriptionContext} instance representing subscriptions * (from the {@link SubscriptionManager}) associated with this entity, and capable @@ -176,6 +180,9 @@ public interface ManagementContext { */ SubscriptionContext getSubscriptionContext(Entity entity); + /** As {@link #getSubscriptionContext(Entity)} where there is also an adjunct */ + SubscriptionContext getSubscriptionContext(Entity e, EntityAdjunct a); + /** * Returns a {@link SubscriptionContext} instance representing subscriptions * (from the {@link SubscriptionManager}) associated with this location, and capable diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionManager.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionManager.java index 1fa327efa0..8302ba8f9a 100644 --- a/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionManager.java +++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionManager.java @@ -42,8 +42,10 @@ public interface SubscriptionManager { * * The method returns an id which can be used to {@link #unsubscribe(SubscriptionHandle)} later. *

- * The listener callback is in-order single-threaded and synchronized on this object. The flags - * parameters can include the following: + * The listener callback is in-order single-threaded and synchronized on this object. + * In other words message delivery from a producer to a given subscriber is in publish order + * (or in the case of a late subscriber getting initial values, in subscribe order). + * The flags parameters can include the following: *

    *
  • subscriber - object to identify the subscriber (e.g. entity, or console session uid) *
  • in future - control parameters for the subscription (period, minimum delta for updates, etc) diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigParametersYamlTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigParametersYamlTest.java index 98ccb0bddf..3840e2d685 100644 --- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigParametersYamlTest.java +++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigParametersYamlTest.java @@ -934,13 +934,12 @@ public void testManuallyAddInTaskOfOtherEntity() throws Exception { Entity app = createStartWaitAndLogApplication(yaml); final TestEntity entity1 = (TestEntity) Iterables.getOnlyElement(app.getChildren()); - TestEntity entity2 = entity1.getExecutionContext().submit(new Callable() { - public TestEntity call() { - TestEntity entity2 = entity1.addChild(EntitySpec.create(TestEntity.class)); - entity2.start(Collections.emptyList()); - return entity2; - } - }).get(); + TestEntity entity2 = entity1.getExecutionContext().submit("create and start", () -> { + TestEntity entity2i = entity1.addChild(EntitySpec.create(TestEntity.class)); + entity2i.start(Collections.emptyList()); + return entity2i; + }) + .get(); Entities.dumpInfo(app); diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java index 63aba8e84d..04b6bf124a 100644 --- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java +++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java @@ -327,7 +327,7 @@ public Entity call() throws Exception { protected void runConcurrentWorker(Supplier taskSupplier) { Collection> results = new ArrayList<>(); for (int i = 0; i < MAX_PARALLEL_RESOLVERS; i++) { - Task result = app.getExecutionContext().submit(taskSupplier.get()); + Task result = app.getExecutionContext().submit("parallel "+i, taskSupplier.get()); results.add(result); } for (Task result : results) { @@ -550,7 +550,7 @@ public Maybe call() throws Exception { } }; if (execInTask) { - Task> task = ((EntityInternal)context).getExecutionContext().submit(job); + Task> task = ((EntityInternal)context).getExecutionContext().submit("Resolving DSL for test: "+dsl, job); task.get(Asserts.DEFAULT_LONG_TIMEOUT); assertTrue(task.isDone()); return task.get(); diff --git a/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java b/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java index 2d0cf7dd61..682dedb61f 100644 --- a/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java +++ b/core/src/main/java/org/apache/brooklyn/core/config/ConfigConstraints.java @@ -25,12 +25,15 @@ import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.api.mgmt.ExecutionContext; import org.apache.brooklyn.api.objs.BrooklynObject; import org.apache.brooklyn.api.objs.EntityAdjunct; import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.objs.AbstractEntityAdjunct; import org.apache.brooklyn.core.objs.BrooklynObjectInternal; import org.apache.brooklyn.core.objs.BrooklynObjectPredicate; +import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.guava.Maybe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,12 +115,21 @@ public ConfigConstraints(T brooklynObject) { abstract Iterable> getBrooklynObjectTypeConfigKeys(); public Iterable> getViolations() { - // TODO in new task - return validateAll(); + ExecutionContext exec = + getBrooklynObject() instanceof EntityInternal ? ((EntityInternal)getBrooklynObject()).getExecutionContext() : + getBrooklynObject() instanceof AbstractEntityAdjunct ? ((AbstractEntityAdjunct)getBrooklynObject()).getExecutionContext() : + null; + if (exec!=null) { + return exec.get( + Tasks.>>builder().dynamic(false).displayName("Validating config").body( + () -> validateAll() ).build() ); + } else { + return validateAll(); + } } @SuppressWarnings("unchecked") - private Iterable> validateAll() { + protected Iterable> validateAll() { List> violating = Lists.newLinkedList(); Iterable> configKeys = getBrooklynObjectTypeConfigKeys(); LOG.trace("Checking config keys on {}: {}", getBrooklynObject(), configKeys); diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java index bd7df0698b..dfbbc8f036 100644 --- a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java +++ b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java @@ -529,6 +529,7 @@ public void setManagementContext(ManagementContextInternal managementContext) { * through this method. Internally, all attribute updates synch on this object. Code wishing to * update attributes or publish while holding some other lock should acquire the monitor on this * object first to prevent deadlock. */ + @Beta protected Object getAttributesSynchObjectInternal() { return attributesInternal.getSynchObjectInternal(); } diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/trait/StartableMethods.java b/core/src/main/java/org/apache/brooklyn/core/entity/trait/StartableMethods.java index 1883166594..30b234813b 100644 --- a/core/src/main/java/org/apache/brooklyn/core/entity/trait/StartableMethods.java +++ b/core/src/main/java/org/apache/brooklyn/core/entity/trait/StartableMethods.java @@ -49,20 +49,20 @@ private StartableMethods() {} /** Common implementation for start in parent nodes; just invokes start on all children of the entity */ public static void start(Entity e, Collection locations) { log.debug("Starting entity "+e+" at "+locations); - DynamicTasks.queueIfPossible(startingChildren(e, locations)).orSubmitAsync(e).getTask().getUnchecked(); + DynamicTasks.get(startingChildren(e, locations), e); } /** Common implementation for stop in parent nodes; just invokes stop on all children of the entity */ public static void stop(Entity e) { log.debug("Stopping entity "+e); - DynamicTasks.queueIfPossible(stoppingChildren(e)).orSubmitAsync(e).getTask().getUnchecked(); + DynamicTasks.get(stoppingChildren(e), e); if (log.isDebugEnabled()) log.debug("Stopped entity "+e); } /** Common implementation for restart in parent nodes; just invokes restart on all children of the entity */ public static void restart(Entity e) { log.debug("Restarting entity "+e); - DynamicTasks.queueIfPossible(restartingChildren(e)).orSubmitAsync(e).getTask().getUnchecked(); + DynamicTasks.get(restartingChildren(e), e); if (log.isDebugEnabled()) log.debug("Restarted entity "+e); } diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java index 940057e18e..dd7d22db59 100644 --- a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java +++ b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java @@ -139,7 +139,7 @@ public void start() { for (final Callable oneOffJob : oneOffJobs) { Task task = Tasks.builder().dynamic(false).body((Callable) oneOffJob).displayName("Poll").description("One-time poll job "+oneOffJob).build(); - oneOffTasks.add(((EntityInternal)entity).getExecutionContext().submit(task)); + oneOffTasks.add(feed.getExecutionContext().submit(task)); } Duration minPeriod = null; diff --git a/core/src/main/java/org/apache/brooklyn/core/location/BasicMachineDetails.java b/core/src/main/java/org/apache/brooklyn/core/location/BasicMachineDetails.java index f8a7cce698..7906616b97 100644 --- a/core/src/main/java/org/apache/brooklyn/core/location/BasicMachineDetails.java +++ b/core/src/main/java/org/apache/brooklyn/core/location/BasicMachineDetails.java @@ -96,10 +96,7 @@ public String toString() { */ @Beta public static BasicMachineDetails forSshMachineLocationLive(SshMachineLocation location) { - return TaskTags.markInessential(DynamicTasks.queueIfPossible(taskForSshMachineLocation(location)) - .orSubmitAsync() - .asTask()) - .getUnchecked(); + return DynamicTasks.get(TaskTags.markInessential(taskForSshMachineLocation(location))); } /** diff --git a/core/src/main/java/org/apache/brooklyn/core/location/access/BrooklynAccessUtils.java b/core/src/main/java/org/apache/brooklyn/core/location/access/BrooklynAccessUtils.java index 40f71e26a4..35bc2f8495 100644 --- a/core/src/main/java/org/apache/brooklyn/core/location/access/BrooklynAccessUtils.java +++ b/core/src/main/java/org/apache/brooklyn/core/location/access/BrooklynAccessUtils.java @@ -119,7 +119,7 @@ public static HostAndPort getBrooklynAccessibleAddress(Entity entity, int port) public static String getResolvedAddress(Entity entity, SshMachineLocation origin, String hostnameTarget) { ProcessTaskWrapper task = SshTasks.newSshExecTaskFactory(origin, "ping -c 1 -t 1 "+hostnameTarget) .summary("checking resolution of "+hostnameTarget).allowingNonZeroExitCode().newTask(); - DynamicTasks.queueIfPossible(task).orSubmitAndBlock(entity).asTask().blockUntilEnded(); + DynamicTasks.queueIfPossible(task).orSubmitAndBlock(entity).getTask().blockUntilEnded(); if (task.asTask().isError()) { log.warn("ping could not be run, at "+entity+" / "+origin+": "+Tasks.getError(task.asTask())); return ""; diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java index 5cb102743f..7d35ac4022 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java @@ -34,6 +34,7 @@ import org.apache.brooklyn.api.mgmt.ManagementContext; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.api.mgmt.entitlement.EntitlementContext; +import org.apache.brooklyn.api.objs.EntityAdjunct; import org.apache.brooklyn.core.mgmt.internal.AbstractManagementContext; import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.core.task.DynamicTasks; @@ -59,13 +60,14 @@ /** Provides utilities for making Tasks easier to work with in Brooklyn. * Main thing at present is to supply (and find) wrapped entities for tasks to understand the * relationship of the entity to the task. - * TODO Longer term it would be better to remove 'tags' on Tasks and use a strongly typed context object. + *

    + * Eventually it may be better to replace these 'tags' on Tasks with strongly typed context objects. * (Tags there are used mainly for determining who called it (caller), what they called it on (target entity), * and what type of task it is (effector, schedule/sensor, etc).) */ public class BrooklynTaskTags extends TaskTags { - private static final Logger log = LoggerFactory.getLogger(BrooklynTaskTags.WrappedEntity.class); + private static final Logger log = LoggerFactory.getLogger(BrooklynTaskTags.class); /** Tag for tasks which are running on behalf of the management server, rather than any entity */ public static final String BROOKLYN_SERVER_TASK_TAG = "BROOKLYN-SERVER"; @@ -85,37 +87,66 @@ public class BrooklynTaskTags extends TaskTags { // ------------- entity tags ------------------------- - public static class WrappedEntity { + public abstract static class WrappedItem { + /** @deprecated since 0.13.0 going private; use {@link #getWrappingType()} */ + @Deprecated public final String wrappingType; - public final Entity entity; - protected WrappedEntity(String wrappingType, Entity entity) { + protected WrappedItem(String wrappingType) { Preconditions.checkNotNull(wrappingType); - Preconditions.checkNotNull(entity); this.wrappingType = wrappingType; - this.entity = entity; + } + public abstract T unwrap(); + public String getWrappingType() { + return wrappingType; } @Override public String toString() { - return "Wrapped["+wrappingType+":"+entity+"]"; + return "Wrapped["+getWrappingType()+":"+unwrap()+"]"; } @Override public int hashCode() { - return Objects.hashCode(entity, wrappingType); + return Objects.hashCode(unwrap(), getWrappingType()); } @Override public boolean equals(Object obj) { if (this==obj) return true; - if (!(obj instanceof WrappedEntity)) return false; + if (!(obj instanceof WrappedItem)) return false; return - Objects.equal(entity, ((WrappedEntity)obj).entity) && - Objects.equal(wrappingType, ((WrappedEntity)obj).wrappingType); + Objects.equal(unwrap(), ((WrappedItem)obj).unwrap()) && + Objects.equal(getWrappingType(), ((WrappedItem)obj).getWrappingType()); + } + } + public static class WrappedEntity extends WrappedItem { + /** @deprecated since 0.13.0 going private; use {@link #unwrap()} */ + @Deprecated + public final Entity entity; + protected WrappedEntity(String wrappingType, Entity entity) { + super(wrappingType); + this.entity = Preconditions.checkNotNull(entity); + } + @Override + public Entity unwrap() { + return entity; } } + public static class WrappedObject extends WrappedItem { + private final T object; + protected WrappedObject(String wrappingType, T object) { + super(wrappingType); + this.object = Preconditions.checkNotNull(object); + } + @Override + public T unwrap() { + return object; + } + } public static final String CONTEXT_ENTITY = "contextEntity"; public static final String CALLER_ENTITY = "callerEntity"; public static final String TARGET_ENTITY = "targetEntity"; + public static final String CONTEXT_ADJUNCT = "contextAdjunct"; + /** * Marks a task as running in the context of the entity. This means * resolving any relative/context sensitive values against that entity. @@ -138,6 +169,15 @@ public static WrappedEntity tagForTargetEntity(Entity entity) { return new WrappedEntity(TARGET_ENTITY, entity); } + /** + * As {@link #tagForContextEntity(Entity)} but wrapping an adjunct. + * Tasks with this tag will also have a {@link #tagForContextEntity(Entity)}. + */ + public static WrappedObject tagForContextAdjunct(EntityAdjunct adjunct) { + return new WrappedObject(CONTEXT_ADJUNCT, adjunct); + } + + public static WrappedEntity getWrappedEntityTagOfType(Task t, String wrappingType) { if (t==null) return null; return getWrappedEntityTagOfType( getTagsFast(t), wrappingType); diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/EntityManagementUtils.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/EntityManagementUtils.java index 0cd18fc197..de9964c1f2 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/EntityManagementUtils.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/EntityManagementUtils.java @@ -91,9 +91,11 @@ public static T createUnstarted(ManagementContext mgmt, */ @Beta public static T createUnstarted(ManagementContext mgmt, EntitySpec spec, Optional entityId) { - // TODO wrap in task - T app = ((EntityManagerInternal)mgmt.getEntityManager()).createEntity(spec, entityId); - return app; + return mgmt.getServerExecutionContext().get(Tasks.builder().dynamic(false) + .displayName("Creating entity "+ + (Strings.isNonBlank(spec.getDisplayName()) ? spec.getDisplayName() : spec.getType().getName()) ) + .body(() -> ((EntityManagerInternal)mgmt.getEntityManager()).createEntity(spec, entityId)) + .build() ); } /** as {@link #createUnstarted(ManagementContext, EntitySpec)} but for a string plan (e.g. camp yaml) */ diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java index 0853b9df93..27645bcdaa 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractManagementContext.java @@ -51,6 +51,7 @@ import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityManager; import org.apache.brooklyn.api.mgmt.rebind.RebindManager; import org.apache.brooklyn.api.objs.BrooklynObject; +import org.apache.brooklyn.api.objs.EntityAdjunct; import org.apache.brooklyn.api.typereg.BrooklynTypeRegistry; import org.apache.brooklyn.api.typereg.RegisteredType; import org.apache.brooklyn.config.StringConfigMap; @@ -70,6 +71,7 @@ import org.apache.brooklyn.core.mgmt.entitlement.Entitlements; import org.apache.brooklyn.core.mgmt.ha.HighAvailabilityManagerImpl; import org.apache.brooklyn.core.mgmt.rebind.RebindManagerImpl; +import org.apache.brooklyn.core.objs.AbstractEntityAdjunct; import org.apache.brooklyn.core.typereg.BasicBrooklynTypeRegistry; import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.core.ResourceUtils; @@ -244,6 +246,21 @@ public ExecutionContext getExecutionContext(Entity e) { return ((EntityInternal)e).getExecutionContext(); } } + + @Override + public ExecutionContext getExecutionContext(Entity e, EntityAdjunct adjunct) { + // BEC is a thin wrapper around EM so fine to create a new one here; but make sure it gets the real entity + if (e instanceof AbstractEntityAdjunct) { + ImmutableSet tags = ImmutableSet.of( + BrooklynTaskTags.tagForContextAdjunct(adjunct), + BrooklynTaskTags.tagForContextEntity(e), + this + ); + return new BasicExecutionContext(getExecutionManager(), tags); + } else { + return ((EntityInternal)e).getExecutionContext(); + } + } @Override public ExecutionContext getServerExecutionContext() { @@ -261,6 +278,14 @@ public SubscriptionContext getSubscriptionContext(Entity e) { Map flags = ImmutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(e))); return new BasicSubscriptionContext(flags, getSubscriptionManager(), e); } + + @Override + public SubscriptionContext getSubscriptionContext(Entity e, EntityAdjunct a) { + // BSC is a thin wrapper around SM so fine to create a new one here + Map flags = ImmutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(e), BrooklynTaskTags.tagForContextAdjunct(a)), + "subscriptionDescription", "adjunct "+a.getId()); + return new BasicSubscriptionContext(flags, getSubscriptionManager(), e); + } @Override public SubscriptionContext getSubscriptionContext(Location loc) { diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractSubscriptionManager.java index c15f770ae6..e7ae59e163 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractSubscriptionManager.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/AbstractSubscriptionManager.java @@ -30,6 +30,7 @@ import org.apache.brooklyn.api.sensor.Sensor; import org.apache.brooklyn.api.sensor.SensorEvent; import org.apache.brooklyn.api.sensor.SensorEventListener; +import org.apache.brooklyn.util.text.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -147,4 +148,8 @@ protected Object getSubscriber(Map flags, Subscription s) return s.subscriber!=null ? s.subscriber : flags.containsKey("subscriber") ? flags.remove("subscriber") : s.listener; } + protected String getSubscriptionDescription(Map flags, Subscription s) { + return s.subscriptionDescription!=null ? s.subscriptionDescription : flags.containsKey("subscriptionDescription") ? Strings.toString(flags.remove("subscriptionDescription")) : null; + } + } diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java index 79957680ed..e0e6decc2e 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java @@ -305,27 +305,20 @@ protected boolean shouldDeleteTaskImmediately(Task task) { if (tags.contains(ManagementContextInternal.EFFECTOR_TAG) || tags.contains(ManagementContextInternal.NON_TRANSIENT_TASK_TAG)) return false; - if (task.getSubmittedByTask()!=null) { - Task parent = task.getSubmittedByTask(); - if (executionManager.getTask(parent.getId())==null) { - // parent is already cleaned up - return true; - } - if (parent instanceof HasTaskChildren && Iterables.contains(((HasTaskChildren)parent).getChildren(), task)) { - // it is a child, let the parent manage this task's death - return false; - } - Entity associatedEntity = BrooklynTaskTags.getTargetOrContextEntity(task); - if (associatedEntity!=null) { - // this is associated to an entity; destroy only if the entity is unmanaged - return !Entities.isManaged(associatedEntity); - } - // if not associated to an entity, then delete immediately - return true; + if (!isSubmitterExpired(task)) { + return false; + } + if (isChild(task)) { + // parent should manage this task's death; but above already kicks in if parent is not expired, so probably shouldn't come here? + LOG.warn("Unexpected expiry candidacy for "+task); + return false; + } + if (isAssociatedToActiveEntity(task)) { + return false; } // e.g. scheduled tasks, sensor events, etc - // TODO (in future may keep some of these with another limit, based on a new TagCategory) + // (in future may keep some of these with another limit, based on a new TagCategory) // there may also be a server association for server-side tasks which should be kept // (but be careful not to keep too many subscriptions!) @@ -337,7 +330,7 @@ protected boolean shouldDeleteTaskImmediately(Task task) { * {@link #maxTasksPerTag} and {@link #maxTaskAge}. */ protected synchronized int gcTasks() { - // TODO Must be careful with memory usage here: have seen OOME if we get crazy lots of tasks. + // NB: be careful with memory usage here: have seen OOME if we get crazy lots of tasks. // hopefully the use new limits, filters, and use of live lists in some places (added Sep 2014) will help. // // An option is for getTasksWithTag(tag) to return an ArrayList rather than a LinkedHashSet. That @@ -397,11 +390,21 @@ protected synchronized int gcTasks() { int deletedCount = 0; deletedCount += expireOverCapacityTagsInCategory(taskNonEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.NON_ENTITY_NORMAL, false); deletedCount += expireOverCapacityTagsInCategory(taskEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.ENTITY, true); - deletedCount += expireSubTasksWhoseSubmitterIsExpired(); - int deletedGlobally = expireIfOverCapacityGlobally(); - deletedCount += deletedGlobally; - if (deletedGlobally>0) deletedCount += expireSubTasksWhoseSubmitterIsExpired(); + // if expensive we could optimize task GC here to avoid repeated lookups by + // counting all expired above (not just prev two lines) and skipping if none + // but that seems unlikely + int deletedHere = 0; + while ((deletedHere = expireHistoricTasksNowReadyForImmediateDeletion()) > 0) { + // delete in loop so we don't have descendants sticking around until deleted in later cycles + deletedCount += deletedHere; + } + + deletedHere = expireIfOverCapacityGlobally(); + deletedCount += deletedHere; + while (deletedHere > 0) { + deletedCount += (deletedHere = expireHistoricTasksNowReadyForImmediateDeletion()); + } return deletedCount; } @@ -471,7 +474,9 @@ protected void expireTransientTasks() { } } - protected int expireSubTasksWhoseSubmitterIsExpired() { + protected int expireHistoricTasksNowReadyForImmediateDeletion() { + // find tasks which weren't ready for immediate deletion, but which now are + // ideally we wouldn't have this; see comments on CHECK_SUBTASK_SUBMITTERS if (!brooklynProperties.getConfig(CHECK_SUBTASK_SUBMITTERS)) return 0; @@ -480,13 +485,15 @@ protected int expireSubTasksWhoseSubmitterIsExpired() { Collection> tasksToDelete = MutableList.of(); try { for (Task task: allTasks) { - if (!task.isDone()) continue; - Task submitter = task.getSubmittedByTask(); - // if we've leaked, ie a subtask which is not a child task, - // and the submitter is GC'd, then delete this also - if (submitter!=null && submitter.isDone() && executionManager.getTask(submitter.getId())==null) { - tasksToDelete.add(task); + if (!shouldDeleteTaskImmediately(task)) { + // 2017-09 previously we only checked done and submitter expired, and deleted if both were true + // so could pick up even things that were non_transient -- now much stricter + continue; + } else { + if (LOG.isTraceEnabled()) LOG.trace("Deleting task which really is no longer wanted: "+task+" (submitted by "+task.getSubmittedByTask()+")"); } + + tasksToDelete.add(task); } } catch (ConcurrentModificationException e) { @@ -500,6 +507,32 @@ protected int expireSubTasksWhoseSubmitterIsExpired() { return tasksToDelete.size(); } + private boolean isAssociatedToActiveEntity(Task task) { + Entity associatedEntity = BrooklynTaskTags.getTargetOrContextEntity(task); + if (associatedEntity==null) { + return false; + } + // this is associated to an entity; destroy only if the entity is unmanaged + return Entities.isManaged(associatedEntity); + } + + private boolean isChild(Task task) { + Task parent = task.getSubmittedByTask(); + return (parent instanceof HasTaskChildren && Iterables.contains(((HasTaskChildren)parent).getChildren(), task)); + } + + private boolean isSubmitterExpired(Task task) { + if (Strings.isBlank(task.getSubmittedByTaskId())) { + return false; + } + Task submitter = task.getSubmittedByTask(); + if (submitter!=null && (!submitter.isDone() || executionManager.getTask(submitter.getId())!=null)) { + return false; + } + // submitter task is GC'd + return true; + } + protected enum TagCategory { ENTITY, NON_ENTITY_NORMAL; diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java index 97c7bacbf8..e192edd29c 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java @@ -38,10 +38,12 @@ import org.apache.brooklyn.core.entity.AbstractEntity; import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.entity.EntityInternal; +import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.mgmt.entitlement.Entitlements; import org.apache.brooklyn.core.mgmt.entitlement.Entitlements.EntityAndItem; import org.apache.brooklyn.core.mgmt.entitlement.Entitlements.StringAndArgument; import org.apache.brooklyn.core.mgmt.internal.NonDeploymentManagementContext.NonDeploymentManagementContextMode; +import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.exceptions.Exceptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,8 +84,8 @@ public EntityManagementSupport(AbstractEntity entity) { protected transient ManagementContext initialManagementContext; protected transient ManagementContext managementContext; - protected transient SubscriptionContext subscriptionContext; - protected transient ExecutionContext executionContext; + protected transient volatile SubscriptionContext subscriptionContext; + protected transient volatile ExecutionContext executionContext; protected final AtomicBoolean managementContextUsable = new AtomicBoolean(false); protected final AtomicBoolean currentlyDeployed = new AtomicBoolean(false); @@ -159,9 +161,10 @@ public void onRebind(ManagementTransitionInfo info) { } public void onManagementStarting(ManagementTransitionInfo info) { - try { - // TODO same-thread task on this entity, with internal tag ? - synchronized (this) { + info.getManagementContext().getExecutionContext(entity).get( Tasks.builder().displayName("Management starting") + .dynamic(false) + .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG) + .body(() -> { try { synchronized (this) { boolean alreadyManaging = isDeployed(); if (alreadyManaging) { @@ -212,13 +215,15 @@ public void onManagementStarting(ManagementTransitionInfo info) { } catch (Throwable t) { managementFailed.set(true); throw Exceptions.propagate(t); - } + }}).build() ); } @SuppressWarnings("deprecation") public void onManagementStarted(ManagementTransitionInfo info) { - try { - synchronized (this) { + info.getManagementContext().getExecutionContext(entity).get( Tasks.builder().displayName("Management started") + .dynamic(false) + .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG) + .body(() -> { try { synchronized (this) { boolean alreadyManaged = isFullyManaged(); if (alreadyManaged) { @@ -265,7 +270,7 @@ public void onManagementStarted(ManagementTransitionInfo info) { } catch (Throwable t) { managementFailed.set(true); throw Exceptions.propagate(t); - } + }}).build() ); } @SuppressWarnings("deprecation") @@ -352,19 +357,25 @@ public synchronized ManagementContext getManagementContext() { return (managementContextUsable.get()) ? managementContext : nonDeploymentManagementContext; } - public synchronized ExecutionContext getExecutionContext() { + public ExecutionContext getExecutionContext() { if (executionContext!=null) return executionContext; if (managementContextUsable.get()) { - executionContext = managementContext.getExecutionContext(entity); - return executionContext; + synchronized (this) { + if (executionContext!=null) return executionContext; + executionContext = managementContext.getExecutionContext(entity); + return executionContext; + } } return nonDeploymentManagementContext.getExecutionContext(entity); } - public synchronized SubscriptionContext getSubscriptionContext() { + public SubscriptionContext getSubscriptionContext() { if (subscriptionContext!=null) return subscriptionContext; if (managementContextUsable.get()) { - subscriptionContext = managementContext.getSubscriptionContext(entity); - return subscriptionContext; + synchronized (this) { + if (subscriptionContext!=null) return subscriptionContext; + subscriptionContext = managementContext.getSubscriptionContext(entity); + return subscriptionContext; + } } return nonDeploymentManagementContext.getSubscriptionContext(entity); } diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java index 2349c735e9..a7260597c2 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java @@ -42,6 +42,7 @@ import org.apache.brooklyn.api.sensor.SensorEvent; import org.apache.brooklyn.api.sensor.SensorEventListener; import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.sensor.BasicSensorEvent; import org.apache.brooklyn.util.collections.MutableList; @@ -53,6 +54,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Objects; import com.google.common.base.Predicate; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashMultimap; @@ -105,6 +107,7 @@ protected synchronized SubscriptionHandle subscribe(Map flag Entity producer = s.producer; Sensor sensor= s.sensor; s.subscriber = getSubscriber(flags, s); + s.subscriptionDescription = getSubscriptionDescription(flags, s); if (flags.containsKey("tags") || flags.containsKey("tag")) { Iterable tags = (Iterable) flags.get("tags"); Object tag = flags.get("tag"); @@ -127,15 +130,9 @@ protected synchronized SubscriptionHandle subscribe(Map flag if (LOG.isDebugEnabled()) LOG.debug("Creating subscription {} for {} on {} {} in {}", new Object[] {s.id, s.subscriber, producer, sensor, this}); allSubscriptions.put(s.id, s); - addToMapOfSets(subscriptionsByToken, makeEntitySensorToken(s.producer, s.sensor), s); - if (s.subscriber!=null) { - addToMapOfSets(subscriptionsBySubscriber, s.subscriber, s); - } - if (!s.subscriberExecutionManagerTagSupplied && s.subscriberExecutionManagerTag!=null) { - ((BasicExecutionManager) em).setTaskSchedulerForTag(s.subscriberExecutionManagerTag, SingleThreadedScheduler.class); - } - + T lastVal; if (notifyOfInitialValue) { + notifyOfInitialValue = false; if (producer == null) { LOG.warn("Cannot notifyOfInitialValue for subscription with wildcard producer: "+s); } else if (sensor == null) { @@ -143,11 +140,58 @@ protected synchronized SubscriptionHandle subscribe(Map flag } else if (!(sensor instanceof AttributeSensor)) { LOG.warn("Cannot notifyOfInitialValue for subscription with non-attribute sensor: "+s); } else { - if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} -> {} to {}", new Object[] {s.producer, s.sensor, s}); - T val = (T) s.producer.getAttribute((AttributeSensor) s.sensor); - submitPublishEvent(s, new BasicSensorEvent(s.sensor, s.producer, val), true); + notifyOfInitialValue = true; } } + if (notifyOfInitialValue) { + lastVal = (T) s.producer.sensors().get((AttributeSensor) s.sensor); + } else { + lastVal = null; // won't be used + } + addToMapOfSets(subscriptionsByToken, makeEntitySensorToken(s.producer, s.sensor), s); + if (s.subscriber!=null) { + addToMapOfSets(subscriptionsBySubscriber, s.subscriber, s); + } + if (!s.subscriberExecutionManagerTagSupplied && s.subscriberExecutionManagerTag!=null) { + ((BasicExecutionManager) em).setTaskSchedulerForTag(s.subscriberExecutionManagerTag, SingleThreadedScheduler.class); + } + + if (notifyOfInitialValue) { + if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} -> {} to {}", new Object[] {s.producer, s.sensor, s}); + // this is run asynchronously to prevent deadlock when trying to get attribute and publish; + // however we want it: + // (a) to run in the same order as subscriptions are made, so use the manager tag scheduler + // (b) ideally to use the last value that was not published to this target, and + // (c) to deliver before any subsequent value notification + // but we can't guarantee either (b) or (c) without taking a lock from before we added + // the subscriber above, mutexing other sets and publications, which feels heavy and dangerous. + // so the compromise is to skip this delivery in cases where the last value has obviously changed - + // because a more recent notification is guaranteed to be sent. + // we may occasionally still send a duplicate, if delivery got sent in the tiny + // window between adding the subscription and taking the last value, + // we will think the last value hasn't changed. but we will never send a + // wrong value as this backs out if there is any confusion over the last value. + em.submit( + MutableMap.of("tags", getPublishTags(s, s.producer), + "displayName", "Initial value publication on subscription to "+s.sensor.getName()), + () -> { + T val = (T) s.producer.sensors().get((AttributeSensor) s.sensor); + if (!Objects.equal(lastVal, val)) { + // bail out - value has been changed; + // this might be a duplicate if value changed in small window earlier, + // but it won't be delivering an old value later than a newer value + if (LOG.isDebugEnabled()) LOG.debug("skipping initial value delivery of {} -> {} to {} as value changed from {} to {}", new Object[] {s.producer, s.sensor, s, lastVal, val}); + return; + } + // guard against case where other thread changes the val and publish + // while we are publishing, and our older val is then delivered after theirs. + // 2017-10 previously we did not do this, then looked at doing it with the attribute lock object + // synchronized (((AbstractEntity)s.producer).getAttributesSynchObjectInternal()) { + // but realized a better thing is to have initial delivery _done_, not just submitted, + // by ourselves, as we are already in the right thread now and can prevent interleaving this way + submitPublishEvent(s, new BasicSensorEvent(s.sensor, s.producer, val), true); + }); + } return s; } @@ -203,7 +247,6 @@ public void publish(final SensorEvent event) { // (recommend exactly one per subscription to prevent deadlock) // this is done with: // em.setTaskSchedulerForTag(subscriberId, SingleThreadedScheduler.class); - //note, generating the notifications must be done in the calling thread to preserve order //e.g. emit(A); emit(B); should cause onEvent(A); onEvent(B) in that order if (LOG.isTraceEnabled()) LOG.trace("{} got event {}", this, event); @@ -221,16 +264,11 @@ public void publish(final SensorEvent event) { } @SuppressWarnings({ "unchecked", "rawtypes" }) - private void submitPublishEvent(final Subscription s, final SensorEvent event, final boolean isInitial) { + private void submitPublishEvent(final Subscription s, final SensorEvent event, final boolean isInitialPublicationOfOldValueInCorrectScheduledThread) { if (s.eventFilter!=null && !s.eventFilter.apply(event)) return; - List tags = MutableList.builder() - .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags) - .add(s.subscriberExecutionManagerTag) - .add(BrooklynTaskTags.SENSOR_TAG) - .build() - .asUnmodifiable(); + List tags = getPublishTags(s, event.getSource()).asUnmodifiable(); StringBuilder name = new StringBuilder("sensor "); StringBuilder description = new StringBuilder("Sensor "); @@ -247,6 +285,10 @@ private void submitPublishEvent(final Subscription s, final SensorEvent event description.append(sourceName==null ? "" : sourceName); description.append(" publishing to "); description.append(s.subscriber instanceof Entity ? ((Entity)s.subscriber).getId() : s.subscriber); + if (Strings.isNonBlank(s.subscriptionDescription)) { + description.append(", "); + description.append(s.subscriptionDescription); + } if (includeDescriptionForSensorTask(event)) { name.append(" "); @@ -258,10 +300,12 @@ private void submitPublishEvent(final Subscription s, final SensorEvent event "displayName", name.toString(), "description", description.toString()); - em.submit(execFlags, new Runnable() { + boolean isEntityStarting = s.subscriber instanceof Entity && isInitialPublicationOfOldValueInCorrectScheduledThread; + // will have entity (and adjunct) execution context from tags, so can skip getting exec context + Runnable deliverer = new Runnable() { @Override public String toString() { - if (isInitial) { + if (isInitialPublicationOfOldValueInCorrectScheduledThread) { return "LSM.publishInitial("+event+")"; } else { return "LSM.publish("+event+")"; @@ -270,6 +314,22 @@ public String toString() { @Override public void run() { try { + if (isEntityStarting) { + /* don't let sub deliveries start until this is completed; + * this is a pragmatic way to ensure the publish events + * if submitted during management starting, aren't executed + * until after management is starting. + * without this we can get deadlocks as this goes to publish, + * has the attribute sensors lock, and waits on the publish lock + * (any of management support, local subs, queueing subs). + * meanwhile the management startup has those three locks, + * then goes to publish and in the process looks up a sensor value. + * usually this is not an issue because some other task + * does something (eg entity.getExecutionContext()) which + * also has a wait-on-management-support semantics. + */ + synchronized (((EntityInternal)s.subscriber).getManagementSupport()) {} + } int count = s.eventCount.incrementAndGet(); if (count > 0 && count % 1000 == 0) LOG.debug("{} events for subscriber {}", count, s); @@ -281,7 +341,26 @@ public void run() { LOG.warn("Error processing subscriptions to "+this+": "+t, t); } } - }}); + }}; + if (!isInitialPublicationOfOldValueInCorrectScheduledThread) { + em.submit(execFlags, deliverer); + } else { + // for initial, caller guarantees he is running in the right thread/context + // where the above submission would take place, typically the + // subscriber single threaded executor with the entity context; + // this allows caller to do extra assertions and bailout steps at the right time + deliverer.run(); + } + } + + private MutableList getPublishTags(final Subscription s, final Entity source) { + return MutableList.builder() + .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags) + .add(s.subscriberExecutionManagerTag) + .add(BrooklynTaskTags.SENSOR_TAG) + // associate the publish event with the publisher (though on init it might be triggered by subscriber) + .addIfNotNull(source!=null ? BrooklynTaskTags.tagForTargetEntity(source) : null) + .build(); } protected boolean includeDescriptionForSensorTask(SensorEvent event) { diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java index bf2c498f1d..b5fd0b584d 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java @@ -58,6 +58,7 @@ import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoPersister; import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoRawData; import org.apache.brooklyn.api.objs.BrooklynObject; +import org.apache.brooklyn.api.objs.EntityAdjunct; import org.apache.brooklyn.api.typereg.BrooklynTypeRegistry; import org.apache.brooklyn.config.StringConfigMap; import org.apache.brooklyn.core.catalog.internal.CatalogInitialization; @@ -101,7 +102,6 @@ public boolean isPreManaged() { private ManagementContextInternal initialManagementContext; private final QueueingSubscriptionManager qsm; - private final BasicSubscriptionContext subscriptionContext; private NonDeploymentEntityManager entityManager; private NonDeploymentLocationManager locationManager; private NonDeploymentAccessManager accessManager; @@ -112,10 +112,6 @@ public NonDeploymentManagementContext(AbstractEntity entity, NonDeploymentManage this.mode = checkNotNull(mode, "mode"); qsm = new QueueingSubscriptionManager(); - // For subscription flags, see AbstractManagementContext.getSubscriptionContext. This is - // needed for callbacks, to ensure the correct entity context is set. - Map subscriptionFlags = ImmutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(entity))); - subscriptionContext = new BasicSubscriptionContext(subscriptionFlags, qsm, entity); entityManager = new NonDeploymentEntityManager(null); locationManager = new NonDeploymentLocationManager(null); accessManager = new NonDeploymentAccessManager(null); @@ -254,7 +250,19 @@ public synchronized SubscriptionContext getSubscriptionContext(Entity entity) { if (!this.entity.equals(entity)) throw new IllegalStateException("Non-deployment context "+this+" can only use a single Entity: has "+this.entity+", but passed "+entity); if (mode==NonDeploymentManagementContextMode.MANAGEMENT_STOPPED) throw new IllegalStateException("Entity "+entity+" is no longer managed; subscription context not available"); - return subscriptionContext; + // see also AbstractManagementContext.getSubscriptionContext - needed for callbacks, to ensure the correct entity context is set + Map subscriptionFlags = ImmutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(entity))); + return new BasicSubscriptionContext(subscriptionFlags, qsm, entity); + } + + @Override + public SubscriptionContext getSubscriptionContext(Entity entity, EntityAdjunct adjunct) { + if (!this.entity.equals(entity)) throw new IllegalStateException("Non-deployment context "+this+" can only use a single Entity: has "+this.entity+", but passed "+entity); + if (mode==NonDeploymentManagementContextMode.MANAGEMENT_STOPPED) + throw new IllegalStateException("Entity "+entity+" is no longer managed; subscription context not available"); + // see also AbstractManagementContext.getSubscriptionContext - needed for callbacks, to ensure the correct entity context is set + Map subscriptionFlags = ImmutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(entity), BrooklynTaskTags.tagForContextAdjunct(adjunct))); + return new BasicSubscriptionContext(subscriptionFlags, qsm, entity); } @Override @@ -273,6 +281,15 @@ public ExecutionContext getExecutionContext(Entity entity) { return initialManagementContext.getExecutionContext(entity); } + @Override + public ExecutionContext getExecutionContext(Entity entity, EntityAdjunct adjunct) { + if (!this.entity.equals(entity)) throw new IllegalStateException("Non-deployment context "+this+" can only use a single Entity: has "+this.entity+", but passed "+entity); + if (mode==NonDeploymentManagementContextMode.MANAGEMENT_STOPPED) + throw new IllegalStateException("Entity "+entity+" is no longer managed; execution context not available"); + checkInitialManagementContextReal(); + return initialManagementContext.getExecutionContext(entity, adjunct); + } + @Override public ExecutionContext getServerExecutionContext() { return initialManagementContext.getServerExecutionContext(); diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java index 83facda03e..57ad073cd7 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/QueueingSubscriptionManager.java @@ -55,6 +55,7 @@ protected synchronized SubscriptionHandle subscribe(Map flag QueuedSubscription qs = new QueuedSubscription(); qs.flags = flags; s.subscriber = getSubscriber(flags, s); + s.subscriptionDescription = getSubscriptionDescription(flags, s); qs.s = s; queuedSubscriptions.add(qs); return s; @@ -76,7 +77,7 @@ public void setDelegate(AbstractSubscriptionManager delegate) { @SuppressWarnings("unchecked") public synchronized void startDelegatingForSubscribing() { - // TODO wrap in same-thread task + // could wrap in same-thread task, but there's enough context without it assert delegate!=null; for (QueuedSubscription s: queuedSubscriptions) { delegate.subscribe(s.flags, s.s); @@ -87,7 +88,7 @@ public synchronized void startDelegatingForSubscribing() { @SuppressWarnings("unchecked") public synchronized void startDelegatingForPublishing() { - // TODO wrap in same-thread task + // could wrap in same-thread task, but there's enough context without it assert delegate!=null; for (SensorEvent evt: queuedSensorEvents) { delegate.publish(evt); diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java index 5e71701b4d..1cf0ad7cd1 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/Subscription.java @@ -37,6 +37,7 @@ class Subscription implements SubscriptionHandle { public Object subscriberExecutionManagerTag; /** whether the tag was supplied by user, in which case we should not clear execution semantics */ public boolean subscriberExecutionManagerTagSupplied; + public String subscriptionDescription; public Iterable subscriberExtraExecTags; public final Entity producer; public final Sensor sensor; diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java index 9e53add9ea..0830900d79 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java @@ -490,15 +490,8 @@ public List rebind(ClassLoader classLoaderO, RebindExceptionHandler ExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext(); if (ec == null) { ec = managementContext.getServerExecutionContext(); - Task> task = ec.submit(new Callable>() { - @Override public List call() throws Exception { - return rebindImpl(classLoader, exceptionHandler, mode); - }}); - try { - return task.get(); - } catch (Exception e) { - throw Exceptions.propagate(e); - } + return ec.get(Tasks.>builder().displayName("rebind").dynamic(false) + .body(() -> rebindImpl(classLoader, exceptionHandler, mode)).build()); } else { return rebindImpl(classLoader, exceptionHandler, mode); } diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java index 460c8c44c2..f67f1f5dac 100644 --- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java +++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java @@ -106,8 +106,7 @@ public T call() { } }; - // TODO can we remove the DST ? this is structured so maybe not - Task t = Tasks.builder().body(job) + Task t = Tasks.builder().dynamic(false).body(job) .displayName("Resolving config "+key.getName()) .description("Internal non-blocking structured key resolution") .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG) @@ -126,20 +125,13 @@ public T call() { * See {@link #getNonBlockingResolvingStructuredKey(ConfigKey)}. */ protected Maybe getNonBlockingResolvingSimple(ConfigKey key) { - // TODO See AbstractConfigMapImpl.getConfigImpl, for how it looks up the "container" of the - // key, so that it gets the right context entity etc. - - // getRaw returns Maybe(val) if the key was explicitly set (where val can be null) - // or Absent if the config key was unset. Object unresolved = getRaw(key).or(key.getDefaultValue()); - // TODO add description that we are evaluating this config key to be used if the code below submits futher tasks - // and look at other uses of "description" method - // and make sure it is marked transient Maybe resolved = Tasks.resolving(unresolved) .as(Object.class) .immediately(true) .deep(true) .context(getContext()) + .description("Resolving raw value of simple config "+key) .getMaybe(); if (resolved.isAbsent()) return Maybe.Absent.castAbsent(resolved); diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java index 2c4c4b4662..7b2f6ad030 100644 --- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java +++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java @@ -29,7 +29,6 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import javax.annotation.Nullable; @@ -37,6 +36,7 @@ import org.apache.brooklyn.api.entity.EntityLocal; import org.apache.brooklyn.api.entity.Group; import org.apache.brooklyn.api.mgmt.ExecutionContext; +import org.apache.brooklyn.api.mgmt.ManagementContext; import org.apache.brooklyn.api.mgmt.SubscriptionHandle; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.api.objs.BrooklynObject; @@ -86,6 +86,8 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple @Deprecated protected Map leftoverProperties = Maps.newLinkedHashMap(); + /** @deprecated since 0.13.0, going private, use {@link #getExecutionContext()} */ + @Deprecated protected transient ExecutionContext execution; private final BasicConfigurationSupport config = new BasicConfigurationSupport(); @@ -213,6 +215,14 @@ protected boolean isLegacyNoConstructionInit() { return _legacyNoConstructionInit; } + /** If the entity has been set, returns the execution context indicating this adjunct. + * Primarily intended for this adjunct to execute tasks, but in some cases, mainly low level, + * it may make sense for other components to execute tasks against this adjunct. */ + @Beta + public ExecutionContext getExecutionContext() { + return execution; + } + @Override public ConfigurationSupportInternal config() { return config; @@ -276,7 +286,7 @@ protected SubscriptionTracker getSubscriptionTracker() { synchronized (AbstractEntityAdjunct.this) { if (_subscriptionTracker!=null) return _subscriptionTracker; if (entity==null) return null; - _subscriptionTracker = new SubscriptionTracker(((EntityInternal)entity).subscriptions().getSubscriptionContext()); + _subscriptionTracker = new SubscriptionTracker(getManagementContext().getSubscriptionContext(entity, AbstractEntityAdjunct.this)); return _subscriptionTracker; } } @@ -334,7 +344,7 @@ public void refreshInheritedConfigOfChildren() { @Override protected ExecutionContext getContext() { - return AbstractEntityAdjunct.this.execution; + return AbstractEntityAdjunct.this.getExecutionContext(); } @Override @@ -402,10 +412,20 @@ public void setDisplayName(String name) { this.name = name; } + @Override + public ManagementContext getManagementContext() { + ManagementContext result = super.getManagementContext(); + if (result!=null) return result; + if (entity!=null) { + return ((EntityInternal)entity).getManagementContext(); + } + return null; + } + public void setEntity(EntityLocal entity) { if (destroyed.get()) throw new IllegalStateException("Cannot set entity on a destroyed entity adjunct"); this.entity = entity; - this.execution = ((EntityInternal) entity).getExecutionContext(); + this.execution = getManagementContext().getExecutionContext(entity, this); if (entity!=null && getCatalogItemId() == null) { setCatalogItemIdAndSearchPath(entity.getCatalogItemId(), entity.getCatalogItemIdSearchPath()); } diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AdjunctConfigMap.java b/core/src/main/java/org/apache/brooklyn/core/objs/AdjunctConfigMap.java index 602d943ffb..71fe16c4eb 100644 --- a/core/src/main/java/org/apache/brooklyn/core/objs/AdjunctConfigMap.java +++ b/core/src/main/java/org/apache/brooklyn/core/objs/AdjunctConfigMap.java @@ -69,9 +69,7 @@ protected void postSetConfig() { /* noop */ } @Override protected ExecutionContext getExecutionContext(BrooklynObject bo) { - // TODO expose ((AbstractEntityAdjunct)bo).execution ? - Entity entity = ((AbstractEntityAdjunct)bo).entity; - return (entity != null) ? ((EntityInternal)entity).getExecutionContext() : null; + return ((AbstractEntityAdjunct)bo).getExecutionContext(); } @Override diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java b/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java index 6ad42f4156..972612bc60 100644 --- a/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java +++ b/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynObjectInternal.java @@ -96,7 +96,9 @@ public interface ConfigurationSupportInternal extends Configurable.Configuration /** * Returns the uncoerced value for this config key, if available, not taking any default. * If there is no local value and there is an explicit inherited value, will return the inherited. + * May return a {@link Maybe}-wrapped null if the value is explicitly null. * Returns {@link Maybe#absent()} if the key is not explicitly set on this object or an ancestor. + * Often this is used with {@link Maybe#or(Object))} to return default value. *

    * See also {@link #getLocalRaw(ConfigKey). */ diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/EntityProxyImpl.java b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/EntityProxyImpl.java index daaf18b3dc..4b6e704994 100644 --- a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/EntityProxyImpl.java +++ b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/EntityProxyImpl.java @@ -209,7 +209,7 @@ public Object invoke(Object proxy, final Method m, final Object[] args) throws T TaskAdaptable task = ((EffectorWithBody)eff).getBody().newTask(delegate, eff, ConfigBag.newInstance(parameters)); // as per LocalManagementContext.runAtEntity(Entity entity, TaskAdaptable task) TaskTags.markInessential(task); - result = DynamicTasks.queueIfPossible(task.asTask()).orSubmitAsync(delegate).andWaitForSuccess(); + result = DynamicTasks.get(task.asTask(), delegate); } else { result = m.invoke(delegate, nonNullArgs); } diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java index a14225ba59..bd6cc1d4de 100644 --- a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java +++ b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java @@ -309,7 +309,7 @@ protected void initEntityAndDescendants(String entityId, fina * which currently show up at the top level once the initializer task completes. * TODO It would be nice if these schedule tasks were grouped in a bucket! */ - ((EntityInternal)entity).getExecutionContext().submit(Tasks.builder().dynamic(false).displayName("Entity initialization") + ((EntityInternal)entity).getExecutionContext().get(Tasks.builder().dynamic(false).displayName("Entity initialization") .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG) .body(new Runnable() { @Override @@ -354,7 +354,7 @@ public void run() { initEntityAndDescendants(child.getId(), entitiesByEntityId, specsByEntityId); } } - }).build()).getUnchecked(); + }).build()); } /** diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java b/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java index dee070048a..d9da6ae874 100644 --- a/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java @@ -139,9 +139,18 @@ private void checkPath(Collection path) { } public T update(AttributeSensor attribute, T newValue) { - T oldValue = updateWithoutPublishing(attribute, newValue); - entity.emitInternal(attribute, newValue); - return oldValue; + // 2017-10 this was unsynched which meant if two threads updated + // the last publication would not correspond to the last value. + // could introduce deadlock but emit internal and publish should + // not seek any locks. _subscribe_ and _delivery_ might, but they + // won't be in this block. an issue with _subscribe-and-get-initial_ + // should be resolved by initial subscription queueing the publication + // to a context where locks are not held. + synchronized (values) { + T oldValue = updateWithoutPublishing(attribute, newValue); + entity.emitInternal(attribute, newValue); + return oldValue; + } } public T updateWithoutPublishing(AttributeSensor attribute, T newValue) { diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java index 23b48f0ce3..b9a9041f96 100644 --- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java +++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java @@ -827,10 +827,8 @@ protected Collection shrink(int delta) { for (Entity member : removedStartables) { tasks.add(newThrottledEffectorTask(member, Startable.STOP, Collections.emptyMap())); } - Task invoke = Tasks.parallel(tasks.build()); - DynamicTasks.queueIfPossible(invoke).orSubmitAsync(); try { - invoke.get(); + DynamicTasks.get( Tasks.parallel(tasks.build()) ); return removedEntities; } catch (Exception e) { throw Exceptions.propagate(e); @@ -1075,8 +1073,7 @@ protected void stopAndRemoveNode(Entity member) { try { if (member instanceof Startable) { Task task = newThrottledEffectorTask(member, Startable.STOP, Collections.emptyMap()); - DynamicTasks.queueIfPossible(task).orSubmitAsync(); - task.getUnchecked(); + DynamicTasks.get(task); } } finally { Entities.unmanage(member); diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/SshCommandMembershipTrackingPolicy.java b/core/src/main/java/org/apache/brooklyn/entity/group/SshCommandMembershipTrackingPolicy.java index ea2ec5541f..373c667f7a 100644 --- a/core/src/main/java/org/apache/brooklyn/entity/group/SshCommandMembershipTrackingPolicy.java +++ b/core/src/main/java/org/apache/brooklyn/entity/group/SshCommandMembershipTrackingPolicy.java @@ -186,7 +186,7 @@ private void execute(Entity target, String command, String type, String memberId // Try to resolve the configuration in the env Map try { - env = (Map) Tasks.resolveDeepValue(env, Object.class, ((EntityInternal) entity).getExecutionContext()); + env = (Map) Tasks.resolveDeepValue(env, Object.class, getExecutionContext()); } catch (InterruptedException | ExecutionException e) { throw Exceptions.propagate(e); } diff --git a/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java b/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java index ca4130444f..3f78c3a5a0 100644 --- a/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java +++ b/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java @@ -229,7 +229,7 @@ protected void preStart() { final ProcessTaskFactory taskFactory = newTaskFactory(pollInfo.command, pollInfo.env, pollInfo.dir, pollInfo.input, pollInfo.context, pollInfo.timeout); - final ExecutionContext executionContext = ((EntityInternal) entity).getExecutionContext(); + final ExecutionContext executionContext = getExecutionContext(); getPoller().scheduleAtFixedRate( new Callable() { diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/AbstractExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/AbstractExecutionContext.java index e7debb9cca..424bbfcedb 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/AbstractExecutionContext.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/AbstractExecutionContext.java @@ -25,6 +25,7 @@ import org.apache.brooklyn.api.mgmt.ExecutionManager; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.api.mgmt.TaskAdaptable; +import org.apache.brooklyn.util.collections.MutableMap; import com.google.common.collect.Maps; @@ -41,11 +42,16 @@ public abstract class AbstractExecutionContext implements ExecutionContext { public Task submit(Map properties, Runnable runnable) { return submitInternal(properties, runnable); } /** @see #submit(Map, Runnable) */ - @Override + @Override + public Task submit(String displayName, Runnable runnable) { return submitInternal(MutableMap.of("displayName", displayName), runnable); } + @Override @Deprecated public Task submit(Runnable runnable) { return submitInternal(Maps.newLinkedHashMap(), runnable); } + /** @see #submit(Map, Runnable) */ - @Override + @Override + public Task submit(String displayName, Callable callable) { return submitInternal(MutableMap.of("displayName", displayName), callable); } + @Override @Deprecated public Task submit(Callable callable) { return submitInternal(Maps.newLinkedHashMap(), callable); } /** @see #submit(Map, Runnable) */ diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java index 435e50e9f8..2c8ddab340 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java @@ -44,6 +44,7 @@ import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity; +import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedItem; import org.apache.brooklyn.core.mgmt.entitlement.Entitlements; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.collections.MutableSet; @@ -83,7 +84,7 @@ public BasicExecutionContext(ExecutionManager executionManager) { * Supported flags are {@code tag} and {@code tags} * * @see ExecutionManager#submit(Map, TaskAdaptable) - * @deprecated since 0.12.0 use {@link #BasicExecutionContext(ExecutionManager, Iterable)} + * @deprecated since 0.13.0 use {@link #BasicExecutionContext(ExecutionManager, Iterable)} */ @Deprecated public BasicExecutionContext(Map flags, ExecutionManager executionManager) { @@ -107,8 +108,8 @@ public BasicExecutionContext(ExecutionManager executionManager, Iterable tags // which may require access to internal methods // (could remove this check if generalizing; it has been here for a long time and the problem seems gone) for (Object tag: tags) { - if (tag instanceof BrooklynTaskTags.WrappedEntity) { - if (Proxy.isProxyClass(((WrappedEntity)tag).entity.getClass())) { + if (tag instanceof BrooklynTaskTags.WrappedItem) { + if (Proxy.isProxyClass(((WrappedItem)tag).unwrap().getClass())) { log.warn(""+this+" has entity proxy in "+tag); } } @@ -242,6 +243,11 @@ private Maybe runInSameThread(final Task task, Callable> job) } } + @Override + public Maybe getImmediately(Task callableOrSupplier) { + return getImmediately((Object) callableOrSupplier); + } + /** performs execution without spawning a new task thread, though it does temporarily set a fake task for the purpose of getting context; * currently supports {@link Supplier}, {@link Callable}, {@link Runnable}, or {@link Task} instances; * with tasks if it is submitted or in progress, @@ -281,17 +287,6 @@ public Maybe getImmediately(Object callableOrSupplier) { try { return runInSameThread(fakeTaskForContext, new Callable>() { public Maybe call() { - // could try to make this work for more types of tasks by not cancelling, just interrupting; - // however there is a danger that immediate-submission tasks are leaked if we don't cancel. - // for instance with DSTs the thread interrupt may apply only to the main job queue.andWait blocking, - // leaving other tasks leaked. - // - // this method is best-effort so fine if it doesn't succeed. good if we can expand - // coverage but NOT at the expense of major leaks of course! - // - // see WIP test in EffectorSayHiTest - fakeTaskForContext.cancel(); - boolean wasAlreadyInterrupted = Thread.interrupted(); try { return job.getImmediately(); @@ -299,6 +294,13 @@ public Maybe call() { if (wasAlreadyInterrupted) { Thread.currentThread().interrupt(); } + // we've acknowledged that getImmediate may wreck (cancel) the task, + // their first priority is to prevent them from leaking; + // however previously we did the cancel before running, + // doing it after means more tasks successfully execute + // (the interrupt is sufficient to prevent them blocking); + // see test EffectorSayHiTest.testInvocationGetImmediately + fakeTaskForContext.cancel(); } } }); } catch (Exception e) { diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java index e71b43efa3..c53277d450 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java @@ -57,6 +57,7 @@ import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.util.collections.MutableList; +import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException; @@ -387,10 +388,12 @@ public Set getTaskTags() { } } - @Override public Task submit(Runnable r) { return submit(new LinkedHashMap(1), r); } + @Override @Deprecated public Task submit(Runnable r) { return submit(new LinkedHashMap(1), r); } + @Override public Task submit(String displayName, Runnable r) { return submit(MutableMap.of("displayName", displayName), r); } @Override public Task submit(Map flags, Runnable r) { return submit(flags, new BasicTask(flags, r)); } - @Override public Task submit(Callable c) { return submit(new LinkedHashMap(1), c); } + @Override @Deprecated public Task submit(Callable c) { return submit(new LinkedHashMap(1), c); } + @Override public Task submit(String displayName, Callable c) { return submit(MutableMap.of("displayName", displayName), c); } @Override public Task submit(Map flags, Callable c) { return submit(flags, new BasicTask(flags, c)); } @Override public Task submit(TaskAdaptable t) { return submit(new LinkedHashMap(1), t); } @@ -796,7 +799,11 @@ private Task gone() { Task t = Tasks.builder().dynamic(false).displayName(displayName+" (placeholder for "+id+")") .description("Details of the original task have been forgotten.") .body(Callables.returning((T)null)).build(); - ((BasicTask)t).ignoreIfNotRun(); + // don't really want anyone executing the "gone" task... + // also if we are GC'ing tasks then cancelled may help with cleanup + // of sub-tasks that have lost their submitted-by-task reference ? + // also don't want warnings when it's finalized, this means we don't need ignoreIfNotRun() + ((BasicTask)t).cancelled = true; return t; } } diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java index 15b062adf0..9026798c5b 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicTasks.java @@ -41,6 +41,16 @@ /** * Contains static methods which detect and use the current {@link TaskQueueingContext} to execute tasks. + *

    + * Queueing is supported by some task contexts (eg {@link DynamicSequentialTask}) to let that task + * build up a complex sequence of tasks and logic. This utility class gives conveniences to allow: + *

    + *

  • "queue-if-possible-else-submit-async", so that it is backgrounded, using queueing semantics if available; + *
  • "queue-if-possible-else-submit-blocking", so that it is in the queue if there is one, else it will complete synchronously; + *
  • "queue-if-possible-else-submit-and-in-both-cases-block", so that it is returned immediately, but waits in its queue if there is one. + *

    + * Over time the last mode has been the most prevalent and {@link #get(TaskAdaptable)} is introduced here + * as a convenience. If a timeout is desired then the first should be used. * * @since 0.6.0 */ @@ -107,7 +117,7 @@ public TaskQueueingResult executionContext(Entity entity) { this.execContext = ((EntityInternal)entity).getExecutionContext(); return this; } - private boolean orSubmitInternal() { + private boolean orSubmitInternal(boolean samethread) { if (!wasQueued()) { if (isQueuedOrSubmitted()) { log.warn("Redundant call to execute "+getTask()+"; skipping"); @@ -118,43 +128,58 @@ private boolean orSubmitInternal() { ec = BasicExecutionContext.getCurrentExecutionContext(); if (ec==null) throw new IllegalStateException("Cannot execute "+getTask()+" without an execution context; ensure caller is in an ExecutionContext"); - ec.submit(getTask()); + if (samethread) ec.get(getTask()); + else ec.submit(getTask()); return true; } } else { return false; } } - /** causes the task to be submitted (asynchronously) if it hasn't already been, - * requiring an entity execution context (will try to find a default if not set) */ + /** Causes the task to be submitted (asynchronously) if it hasn't already been, + * such as if a previous {@link DynamicTasks#queueIfPossible(TaskAdaptable)} did not have a queueing context. + *

    + * An {@link #executionContext(ExecutionContext)} should typically have been set + * (or use {@link #orSubmitAsync(Entity)}). + */ public TaskQueueingResult orSubmitAsync() { - orSubmitInternal(); + orSubmitInternal(false); return this; } - /** convenience for setting {@link #executionContext(ExecutionContext)} then submitting async */ + /** Convenience for setting {@link #executionContext(Entity)} then {@link #orSubmitAsync()}. */ public TaskQueueingResult orSubmitAsync(Entity entity) { executionContext(entity); return orSubmitAsync(); } - /** causes the task to be submitted *synchronously* if it hasn't already been submitted; - * useful in contexts such as libraries where callers may be either on a legacy call path - * (which assumes all commands complete immediately); - * requiring an entity execution context (will try to find a default if not set) */ + /** Alternative to {@link #orSubmitAsync()} but where, if the submission is needed + * (usually because a previous {@link DynamicTasks#queueIfPossible(TaskAdaptable)} did not have a queueing context) + * it will wait until execution completes (and in fact will execute the task in this thread, + * as per {@link ExecutionContext#get(TaskAdaptable)}. + *

    + * If the task is already queued, this method does nothing, not even blocks, + * to permit cases where a caller is building up a set of tasks to be executed sequentially: + * with a queueing context the caller can line them all up, but without that the caller needs this task + * finished before submitting subsequent tasks. + *

    + * If blocking is desired in all cases and this call should fail on task failure, invoke {@link #andWaitForSuccess()} on the result, + * or consider using {@link DynamicTasks#get(TaskAdaptable)} instead of this method, + * or {@link DynamicTasks#get(TaskAdaptable, Entity)} if an execuiton context a la {@link #orSubmitAndBlock(Entity)} is needed. */ public TaskQueueingResult orSubmitAndBlock() { - if (orSubmitInternal()) task.getUnchecked(); + orSubmitInternal(true); return this; } - /** convenience for setting {@link #executionContext(ExecutionContext)} then submitting blocking */ + /** Variant of {@link #orSubmitAndBlock()} doing what {@link #orSubmitAsync(Entity)} does for {@link #orSubmitAsync()}. */ public TaskQueueingResult orSubmitAndBlock(Entity entity) { executionContext(entity); return orSubmitAndBlock(); } - /** blocks for the task to be completed + /** Blocks for the task to be completed, throwing if there are any errors + * and otherwise returning the value. *

    - * needed in any context where subsequent commands assume the task has completed. + * In addition to cases where a result is wanted, this is needed in any context where subsequent commands assume the task has completed. * not needed in a context where the task is simply being built up and queued. *

    - * throws if there are any errors + * */ public T andWaitForSuccess() { return task.getUnchecked(); @@ -169,16 +194,12 @@ public void orCancel() { /** * Tries to add the task to the current addition context if there is one, otherwise does nothing. *

    - * Call {@link TaskQueueingResult#orSubmitAsync() orSubmitAsync()} on the returned + * Call {@link TaskQueueingResult#orSubmitAsync()} on the returned * {@link TaskQueueingResult TaskQueueingResult} to handle execution of tasks in a * {@link BasicExecutionContext}. */ public static TaskQueueingResult queueIfPossible(TaskAdaptable task) { - TaskQueueingContext adder = getTaskQueuingContext(); - boolean result = false; - if (adder!=null) - result = Tasks.tryQueueing(adder, task); - return new TaskQueueingResult(task, result); + return new TaskQueueingResult(task, Tasks.tryQueueing(getTaskQueuingContext(), task)); } /** @see #queueIfPossible(TaskAdaptable) */ @@ -189,22 +210,17 @@ public static TaskQueueingResult queueIfPossible(TaskFactory - * throws if it cannot add */ + * throws if it cannot add or addition/execution would fail including if calling thread is interrupted */ public static Task queueInTaskHierarchy(Task task) { Preconditions.checkNotNull(task, "Task to queue cannot be null"); Preconditions.checkState(!Tasks.isQueuedOrSubmitted(task), "Task to queue must not yet be submitted: {}", task); - TaskQueueingContext adder = getTaskQueuingContext(); - if (adder!=null) { - if (Tasks.tryQueueing(adder, task)) { - log.debug("Queued task {} at context {} (no hierarchy)", task, adder); - return task; - } + if (Tasks.tryQueueing(getTaskQueuingContext(), task)) { + log.debug("Queued task {} at context {} (no hierarchy)", task, getTaskQueuingContext()); + return task; } - Task t = Tasks.current(); - Preconditions.checkState(t!=null || adder!=null, "No task addition context available for queueing task "+task); - + Task t = Tasks.current(); while (t!=null) { if (t instanceof TaskQueueingContext) { if (Tasks.tryQueueing((TaskQueueingContext)t, task)) { @@ -288,12 +304,9 @@ public static > T queueIfNeeded(T task) { return task; } - /** submits/queues the given task if needed, and gets the result (unchecked) - * only permitted in a queueing context (ie a DST main job) if the task is not yet submitted */ - // things get really confusing if you try to queueInTaskHierarchy -- easy to cause deadlocks! + /** submits/queues the given task if needed, and gets the result (unchecked) */ public static T get(TaskAdaptable t) { - // TODO do in foreground? - return queueIfNeeded(t).asTask().getUnchecked(); + return queueIfPossible(t).orSubmitAndBlock().andWaitForSuccess(); } /** As {@link #drain(Duration, boolean)} waiting forever and throwing the first error @@ -334,6 +347,11 @@ public static void markInessential() { public static Task submit(TaskAdaptable task, Entity entity) { return queueIfPossible(task).orSubmitAsync(entity).asTask(); } + + /** queues the task if possible and waits for the result, otherwise executes synchronously as per {@link ExecutionContext#get(TaskAdaptable)} */ + public static T get(TaskAdaptable task, Entity e) { + return queueIfPossible(task).orSubmitAndBlock(e).andWaitForSuccess(); + } /** Breaks the parent-child relation between Tasks.current() and the task passed, * making the new task a top-level one at the target entity. diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java b/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java index 12247c4962..90a6bdc3b8 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java @@ -278,11 +278,11 @@ public static boolean isQueuedOrSubmitted(TaskAdaptable task) { } /** - * Adds the given task to the given context. Does not throw an exception if the addition fails. - * @return true if the task was added, false otherwise. + * Adds the given task to the given context. Does not throw an exception if the addition fails or would fail. + * @return true if the task was added, false otherwise including if context is null or thread is interrupted. */ public static boolean tryQueueing(TaskQueueingContext adder, TaskAdaptable task) { - if (task==null || isQueued(task)) + if (task==null || adder==null || isQueued(task) || Thread.currentThread().isInterrupted()) return false; try { adder.queue(task.asTask()); diff --git a/core/src/test/java/org/apache/brooklyn/core/config/DeferredConfigTest.java b/core/src/test/java/org/apache/brooklyn/core/config/DeferredConfigTest.java index 9decca882c..c2cf908558 100644 --- a/core/src/test/java/org/apache/brooklyn/core/config/DeferredConfigTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/config/DeferredConfigTest.java @@ -21,7 +21,6 @@ import static org.testng.Assert.assertEquals; import java.util.List; -import java.util.concurrent.Callable; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.api.sensor.Sensor; @@ -55,13 +54,10 @@ public void testDeferredConfigInListNotAvailable() throws Exception { void doTestDeferredConfigInList(final boolean delay) throws Exception { // Simulate a deferred value - Task> sensorFuture = app.getExecutionContext().submit(new Callable>() { - @Override - public Sensor call() throws Exception { + Task> sensorFuture = app.getExecutionContext().submit("deferred return sensor", () -> { if (delay) Time.sleep(Duration.FIVE_SECONDS); return TestApplication.MY_ATTRIBUTE; - } - }); + }); app.config().set(SENSORS_UNTYPED, (Object)ImmutableList.of(sensorFuture)); if (!delay) sensorFuture.get(Duration.ONE_SECOND); diff --git a/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java b/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java index bc41d45028..4b991adf8a 100644 --- a/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java @@ -61,6 +61,7 @@ public class EffectorSayHiTest extends BrooklynAppUnitTestSupport { //TODO test edge/error conditions //(missing parameters, wrong number of params, etc) + @SuppressWarnings("unused") private static final Logger log = LoggerFactory.getLogger(EffectorSayHiTest.class); private MyEntity e; @@ -109,11 +110,10 @@ public void testInvocationGet() throws Exception { .get( Effectors.invocation(e, MyEntity.SAY_HI_1, ImmutableMap.of("name", "Bob", "greeting", "hi")) ), "hi Bob"); } - @Test(groups="WIP") // see comments at BasicExecutionContext.getImmediately - // TODO this will be fixed soon by #835 + @Test public void testInvocationGetImmediately() throws Exception { assertEquals(((EntityInternal)e).getExecutionContext() - .getImmediately( Effectors.invocation(e, MyEntity.SAY_HI_1, ImmutableMap.of("name", "Bob", "greeting", "hi")) ), "hi Bob"); + .getImmediately( Effectors.invocation(e, MyEntity.SAY_HI_1, ImmutableMap.of("name", "Bob", "greeting", "hi")) ).get(), "hi Bob"); } @Test diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/ApplicationLifecycleStateTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/ApplicationLifecycleStateTest.java index e8a3eee491..f301b3c04b 100644 --- a/core/src/test/java/org/apache/brooklyn/core/entity/ApplicationLifecycleStateTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/entity/ApplicationLifecycleStateTest.java @@ -267,20 +267,14 @@ public void onEvent(SensorEvent event) { } }); - Task first = mgmt.getExecutionManager().submit(new Runnable() { - @Override - public void run() { + Task first = mgmt.getExecutionManager().submit("setting test sensor", () -> { app.sensors().set(TEST_SENSOR, "first"); log.debug("set first"); - } - }); - Task second = mgmt.getExecutionManager().submit(new Runnable() { - @Override - public void run() { + }); + Task second = mgmt.getExecutionManager().submit("setting test sensor", () -> { app.sensors().set(TEST_SENSOR, "second"); log.debug("set second"); - } - }); + }); first.blockUntilEnded(); second.blockUntilEnded(); @@ -394,8 +388,8 @@ public void run() { }; // Simulates firing the emit method from event handlers in different threads - mgmt.getExecutionManager().submit(overrideJob); - mgmt.getExecutionManager().submit(overrideJob); + mgmt.getExecutionManager().submit("emitting test sensor", overrideJob); + mgmt.getExecutionManager().submit("emitting test sensor", overrideJob); Asserts.eventually(Suppliers.ofInstance(seenValues), CollectionFunctionals.sizeEquals(2)); Asserts.succeedsContinually(new Runnable() { diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntityAssertsTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntityAssertsTest.java index bfdac3c86e..2967f22923 100644 --- a/core/src/test/java/org/apache/brooklyn/core/entity/EntityAssertsTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntityAssertsTest.java @@ -18,7 +18,6 @@ */ package org.apache.brooklyn.core.entity; -import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -33,7 +32,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -85,12 +83,8 @@ public void shouldAssertAttributeEqualsEventually() throws Exception { entity.sensors().set(TestEntity.NAME, "before"); final String after = "after"; - Task assertValue = entity.getExecutionContext().submit(new Runnable() { - @Override - public void run() { - EntityAsserts.assertAttributeEqualsEventually(entity, TestEntity.NAME, after); - } - }); + Task assertValue = entity.getExecutionContext().submit("assert attr equals", + () -> EntityAsserts.assertAttributeEqualsEventually(entity, TestEntity.NAME, after)); entity.sensors().set(TestEntity.NAME, after); assertValue.get(); } @@ -105,12 +99,7 @@ public void shouldFailToAssertAttributeEqualsEventually() { @Test public void shouldAssertAttributeEventuallyNonNull() throws Exception { EntityAsserts.assertAttributeEquals(entity, TestEntity.NAME, null); - Task assertValue = entity.getExecutionContext().submit(new Runnable() { - @Override - public void run() { - EntityAsserts.assertAttributeEventuallyNonNull(entity, TestEntity.NAME); - } - }); + Task assertValue = entity.getExecutionContext().submit("assert attr non-null", () -> EntityAsserts.assertAttributeEventuallyNonNull(entity, TestEntity.NAME)); entity.sensors().set(TestEntity.NAME, "something"); assertValue.get(); } @@ -118,18 +107,11 @@ public void run() { @Test public void shouldAssertAttributeEventually() throws Exception { final CountDownLatch eventuallyEntered = new CountDownLatch(2); - Task assertValue = entity.getExecutionContext().submit(new Runnable() { - @Override - public void run() { - EntityAsserts.assertAttributeEventually(entity, TestEntity.NAME, new Predicate() { - @Override - public boolean apply(String input) { - eventuallyEntered.countDown(); - return input.matches(".*\\d+"); - } - }); - } - }); + Task assertValue = entity.getExecutionContext().submit("assert attribute", () -> EntityAsserts.assertAttributeEventually(entity, TestEntity.NAME, + (input) -> { + eventuallyEntered.countDown(); + return input.matches(".*\\d+"); + }) ); eventuallyEntered.await(); entity.sensors().set(TestEntity.NAME, "testing testing 123"); assertValue.get(); @@ -146,18 +128,11 @@ public void shouldAssertAttribute() { public void shouldAssertPredicateEventuallyTrue() throws Exception { final int testVal = 987654321; final CountDownLatch eventuallyEntered = new CountDownLatch(2); - Task assertValue = entity.getExecutionContext().submit(new Runnable() { - @Override - public void run() { - EntityAsserts.assertPredicateEventuallyTrue(entity, new Predicate() { - @Override - public boolean apply(TestEntity input) { - eventuallyEntered.countDown(); - return testVal == input.getSequenceValue(); - } - }); - } - }); + Task assertValue = entity.getExecutionContext().submit("assert predicate", () -> EntityAsserts.assertPredicateEventuallyTrue(entity, + (input) -> { + eventuallyEntered.countDown(); + return testVal == input.getSequenceValue(); + })); eventuallyEntered.await(); entity.setSequenceValue(testVal); assertValue.get(); @@ -175,12 +150,7 @@ public void shouldAssertAttributeEqualsContinually() { public void shouldFailAssertAttributeEqualsContinually() throws Throwable { final String myName = "myname"; entity.sensors().set(TestEntity.NAME, myName); - Task assertValue = entity.getExecutionContext().submit(new Runnable() { - @Override - public void run() { - EntityAsserts.assertAttributeEqualsContinually(entity, TestEntity.NAME, myName); - } - }); + Task assertValue = entity.getExecutionContext().submit("check attr equals", () -> EntityAsserts.assertAttributeEqualsContinually(entity, TestEntity.NAME, myName)); entity.sensors().set(TestEntity.NAME, "something"); try { assertValue.get(); @@ -199,20 +169,10 @@ public void shouldAssertGroupSizeEqualsEventually() throws Exception { app.createAndManageChild(stooge); app.createAndManageChild(stooge); - Task assertValue1 = entity.getExecutionContext().submit(new Runnable() { - @Override - public void run() { - EntityAsserts.assertGroupSizeEqualsEventually(ImmutableMap.of("timeout", "2s"), stooges, 3); - } - }); + Task assertValue1 = entity.getExecutionContext().submit("assert size", () -> EntityAsserts.assertGroupSizeEqualsEventually(ImmutableMap.of("timeout", "2s"), stooges, 3)); stooges.setEntityFilter(EntityPredicates.configEqualTo(TestEntity.CONF_NAME, STOOGE)); assertValue1.get(); - Task assertValue2 = entity.getExecutionContext().submit(new Runnable() { - @Override - public void run() { - EntityAsserts.assertGroupSizeEqualsEventually(stooges, 0); - } - }); + Task assertValue2 = entity.getExecutionContext().submit("assert size 0", () -> EntityAsserts.assertGroupSizeEqualsEventually(stooges, 0)); stooges.setEntityFilter(EntityPredicates.configEqualTo(TestEntity.CONF_NAME, "Marx Brother")); assertValue2.get(); } @@ -220,24 +180,11 @@ public void run() { @Test public void shouldAssertAttributeChangesEventually () throws Exception{ entity.sensors().set(TestEntity.NAME, "before"); - final Task assertValue = entity.getExecutionContext().submit(new Runnable() { - @Override - public void run() { - EntityAsserts.assertAttributeChangesEventually(entity, TestEntity.NAME); - } - }); + final Task assertValue = entity.getExecutionContext().submit("check attr change", () -> EntityAsserts.assertAttributeChangesEventually(entity, TestEntity.NAME)); Repeater.create() - .repeat(new Runnable() { - @Override - public void run() { - entity.sensors().set(TestEntity.NAME, "after" + System.currentTimeMillis()); - } - }).until(new Callable() { - @Override - public Boolean call() throws Exception { - return assertValue.isDone(); - } - }).every(Duration.millis(10)) + .repeat(() -> entity.sensors().set(TestEntity.NAME, "after" + System.currentTimeMillis())) + .until(() -> assertValue.isDone()) + .every(Duration.millis(10)) .run(); assertValue.get(); } diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java index ea91f366f8..8ec2794c6b 100644 --- a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java @@ -34,6 +34,8 @@ import org.apache.brooklyn.core.test.policy.TestPolicy; import org.apache.brooklyn.entity.group.BasicGroup; import org.apache.brooklyn.test.Asserts; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -44,7 +46,9 @@ public class EntitySubscriptionTest extends BrooklynAppUnitTestSupport { - // TODO Duplication between this and PolicySubscriptionTest + // TODO Duplication between this and PolicySubscriptionTest and LocalSubscriptionManagerTest + + private static final Logger log = LoggerFactory.getLogger(EntitySubscriptionTest.class); private static final long SHORT_WAIT_MS = 100; @@ -221,6 +225,7 @@ public void testUnsubscribeRemovesAllSubscriptionsForThatEntity() { } @Test + @SuppressWarnings("unused") public void testUnsubscribeUsingHandleStopsEvents() { SubscriptionHandle handle1 = entity.subscriptions().subscribe(observedEntity, TestEntity.SEQUENCE, listener); SubscriptionHandle handle2 = entity.subscriptions().subscribe(observedEntity, TestEntity.NAME, listener); @@ -300,6 +305,8 @@ public void testSubscriptionForInitialValueWhenNotValid() { @Test public void testContextEntityOnSubscriptionCallbackTask() { + log.info("Observing "+observedEntity+" from "+entity); + observedEntity.sensors().set(TestEntity.NAME, "myval"); entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), observedEntity, TestEntity.NAME, listener); diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/hello/LocalEntitiesTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/hello/LocalEntitiesTest.java index 985997e62a..6951a973a8 100644 --- a/core/src/test/java/org/apache/brooklyn/core/entity/hello/LocalEntitiesTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/entity/hello/LocalEntitiesTest.java @@ -33,20 +33,19 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.brooklyn.api.entity.EntitySpec; -import org.apache.brooklyn.api.mgmt.EntityManager; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.api.sensor.SensorEvent; import org.apache.brooklyn.api.sensor.SensorEventListener; import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.location.SimulatedLocation; import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; +import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import org.apache.brooklyn.test.Asserts; -import org.apache.brooklyn.util.collections.MutableMap; -import org.apache.brooklyn.util.time.Time; import com.google.common.base.Function; import com.google.common.base.Predicate; @@ -63,14 +62,12 @@ public class LocalEntitiesTest extends BrooklynAppUnitTestSupport { public static final Logger log = LoggerFactory.getLogger(LocalEntitiesTest.class); private SimulatedLocation loc; - private EntityManager entityManager; @BeforeMethod(alwaysRun=true) @Override public void setUp() throws Exception { super.setUp(); loc = new SimulatedLocation(); - entityManager = mgmt.getEntityManager(); } @Test @@ -165,9 +162,10 @@ public void testSendMultipleInOrderThenUnsubscribe() throws Exception { h.setAge(6); long totalTime = stopwatch.elapsed(TimeUnit.MILLISECONDS); - // TODO guava util for (1..5) - Asserts.continually(MutableMap.of("timeout", 50), Suppliers.ofInstance(data), Predicates.equalTo(ImmutableList.of(1,2,3,4,5))); - assertTrue(totalTime < 2000, "totalTime="+totalTime); //shouldn't have blocked for anywhere close to 2s (Aled says TODO: too time sensitive for BuildHive?) + Asserts.continually( + Suppliers.ofInstance(data), Predicates.equalTo(ImmutableList.of(1,2,3,4,5)), + Duration.millis(50), null, null); + assertTrue(totalTime < 2000, "totalTime="+totalTime); //shouldn't have blocked for anywhere close to 2s (unless build machine v v slow eg BuildHive) } @Test @@ -224,8 +222,8 @@ public void run() { assertTrue(System.currentTimeMillis() - startTime < 1500); synchronized (sonsConfig) { assertEquals(null, sonsConfig[0]); - for (Task tt : ((EntityInternal)dad).getExecutionContext().getTasks()) { log.info("task at dad: {}, {}", tt, tt.getStatusDetail(false)); } - for (Task tt : ((EntityInternal)son).getExecutionContext().getTasks()) { log.info("task at son: {}, {}", tt, tt.getStatusDetail(false)); } + for (Task tt : ((EntityInternal)dad).getExecutionContext().getTasks()) { log.info("task at dad: {}, {}", tt, tt.getStatusDetail(false)); } + for (Task tt : ((EntityInternal)son).getExecutionContext().getTasks()) { log.info("task at son: {}, {}", tt, tt.getStatusDetail(false)); } dad.sensors().set(HelloEntity.FAVOURITE_NAME, "Dan"); if (!s1.tryAcquire(2, TimeUnit.SECONDS)) fail("race mismatch, missing permits"); } diff --git a/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java b/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java index 2251153552..f270e54770 100644 --- a/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java @@ -133,7 +133,7 @@ public Boolean call() { } }) .build(); - return DynamicTasks.queueIfPossible(t).orSubmitAsync().asTask().getUnchecked(); + return DynamicTasks.get(t); } } diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java index 51f5bdccec..4f581f5036 100644 --- a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.EntitySpec; @@ -40,14 +41,18 @@ import org.apache.brooklyn.core.internal.BrooklynProperties; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity; +import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedItem; import org.apache.brooklyn.core.sensor.BasicAttributeSensor; import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; import org.apache.brooklyn.core.test.entity.LocalManagementContextForTests; import org.apache.brooklyn.core.test.entity.TestEntity; import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.collections.MutableSet; import org.apache.brooklyn.util.core.task.BasicExecutionManager; import org.apache.brooklyn.util.core.task.ExecutionListener; +import org.apache.brooklyn.util.core.task.ScheduledTask; import org.apache.brooklyn.util.core.task.TaskBuilder; import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.javalang.JavaClassNames; @@ -129,18 +134,29 @@ protected static TaskBuilder newEmptyTask(String name) { return Tasks.builder().displayName(name).dynamic(false).body(Callables.returning(null)); } - protected void assertTaskCountForEntityEventually(final Entity entity, final int expectedCount) { + protected void assertImportantTaskCountForEntityEventually(final Entity entity, final int expectedCount) { // Dead task (and initialization task) should have been GC'd on completion. // However, the GC'ing happens in a listener, executed in a different thread - the task.get() // doesn't block for it. Therefore can't always guarantee it will be GC'ed by now. Asserts.succeedsEventually(new Runnable() { @Override public void run() { - forceGc(); - Collection> tasks = BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity); + forceGc(); + Collection> tasks = removeSystemTasks(BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity)); Assert.assertEquals(tasks.size(), expectedCount, "Tasks were "+tasks); }}); } + static Set> removeSystemTasks(Iterable> tasks) { + Set> result = MutableSet.of(); + for (Task t: tasks) { + if (t instanceof ScheduledTask) continue; + if (t.getTags().contains(BrooklynTaskTags.SENSOR_TAG)) continue; + if (t.getDisplayName().contains("Validating")) continue; + result.add(t); + } + return result; + } + // Needed because of https://issues.apache.org/jira/browse/BROOKLYN-401 protected void assertTaskMaxCountForEntityEventually(final Entity entity, final int expectedMaxCount) { // Dead task (and initialization task) should have been GC'd on completion. @@ -149,7 +165,7 @@ protected void assertTaskMaxCountForEntityEventually(final Entity entity, final Asserts.succeedsEventually(new Runnable() { @Override public void run() { forceGc(); - Collection> tasks = BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity); + Collection> tasks = removeSystemTasks( BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity) ); Assert.assertTrue(tasks.size() <= expectedMaxCount, "Expected tasks count max of " + expectedMaxCount + ". Tasks were "+tasks); }}); @@ -161,8 +177,8 @@ public void testGetTasksAndGcBoringTags() throws Exception { final Task task = runEmptyTaskWithNameAndTags(e, "should-be-kept", ManagementContextInternal.NON_TRANSIENT_TASK_TAG); runEmptyTaskWithNameAndTags(e, "should-be-gcd", ManagementContextInternal.TRANSIENT_TASK_TAG); - assertTaskCountForEntityEventually(e, 1); - Collection> tasks = BrooklynTaskTags.getTasksInEntityContext(app.getManagementContext().getExecutionManager(), e); + assertImportantTaskCountForEntityEventually(e, 1); + Collection> tasks = removeSystemTasks( BrooklynTaskTags.getTasksInEntityContext(app.getManagementContext().getExecutionManager(), e) ); assertEquals(tasks, ImmutableList.of(task), "Mismatched tasks, got: "+tasks); } @@ -281,8 +297,6 @@ public void testGcDynamicTaskAtNormalTagLimit() throws Exception { forceGc(); stopCondition.set(true); - // might need an eventually here, if the internal job completion and GC is done in the background - // (if there are no test failures for a few months, since Sept 2014, then we can remove this comment) assertTaskMaxCountForEntityEventually(e, 2); } @@ -306,8 +320,8 @@ public void testUnmanagedEntityCanBeGcedEvenIfPreviouslyTagged() throws Exceptio if (tag instanceof Entity && ((Entity)tag).getId().equals(eId)) { fail("tags contains unmanaged entity "+tag); } - if ((tag instanceof WrappedEntity) && ((WrappedEntity)tag).entity.getId().equals(eId) - && ((WrappedEntity)tag).wrappingType.equals(BrooklynTaskTags.CONTEXT_ENTITY)) { + if ((tag instanceof WrappedEntity) && ((WrappedEntity)tag).unwrap().getId().equals(eId) + && ((WrappedItem)tag).getWrappingType().equals(BrooklynTaskTags.CONTEXT_ENTITY)) { fail("tags contains unmanaged entity (wrapped) "+tag); } } @@ -320,7 +334,7 @@ public void testSubscriptionAndEffectorTasksGced() throws Exception { // allow background enrichers to complete Time.sleep(Duration.ONE_SECOND); forceGc(); - List> t1 = em.getAllTasks(); + Collection> t1 = em.getAllTasks(); TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)); entity.sensors().set(TestEntity.NAME, "bob"); @@ -328,9 +342,17 @@ public void testSubscriptionAndEffectorTasksGced() throws Exception { Entities.destroy(entity); Time.sleep(Duration.ONE_SECOND); forceGc(); - List> t2 = em.getAllTasks(); + Collection> t2 = em.getAllTasks(); + + // no tasks from first batch were GC'd + Asserts.assertSize(MutableList.builder().addAll(t1).removeAll(t2).build(), 0); - Assert.assertEquals(t1.size(), t2.size(), "lists are different:\n"+t1+"\n"+t2+"\n"); + // and we expect just the add/remove cycle at parent, and service problems + Set newOnes = MutableList.>builder().addAll(t2).removeAll(t1).build().stream().map( + (t) -> t.getDisplayName()).collect(Collectors.toSet()); + Function prefix = (s) -> "sensor "+app.getId()+":"+s; + Assert.assertEquals(newOnes, MutableSet.of( + prefix.apply("entity.children.removed"), prefix.apply("entity.children.added"), prefix.apply("service.problems"))); } /** diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java index 4e470903ed..5c04978283 100644 --- a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java @@ -25,6 +25,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.brooklyn.api.entity.EntitySpec; @@ -32,12 +33,22 @@ import org.apache.brooklyn.api.mgmt.SubscriptionManager; import org.apache.brooklyn.api.sensor.SensorEvent; import org.apache.brooklyn.api.sensor.SensorEventListener; +import org.apache.brooklyn.core.entity.RecordingSensorEventListener; +import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; +import org.apache.brooklyn.core.sensor.BasicSensorEvent; import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; import org.apache.brooklyn.core.test.entity.TestEntity; import org.apache.brooklyn.entity.group.BasicGroup; +import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.time.Duration; +import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + /** * testing the {@link SubscriptionManager} and associated classes. */ @@ -170,4 +181,82 @@ public void run() { if (threadException.get() != null) throw threadException.get(); } + @Test + // same test as in PolicySubscriptionTest, but for entities / simpler + public void testSubscriptionReceivesInitialValueEventsInOrder() { + RecordingSensorEventListener listener = new RecordingSensorEventListener<>(); + + entity.sensors().set(TestEntity.NAME, "myname"); + entity.sensors().set(TestEntity.SEQUENCE, 123); + entity.sensors().emit(TestEntity.MY_NOTIF, -1); + + // delivery should be in subscription order, so 123 then 456 + entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.SEQUENCE, listener); + // wait for the above delivery - otherwise it might get dropped + Asserts.succeedsEventually(() -> Asserts.assertSize(listener.getEvents(), 1)); + entity.sensors().set(TestEntity.SEQUENCE, 456); + + // notifications don't have "initial value" so don't get -1 + entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.MY_NOTIF, listener); + // but do get 1, after 456 + entity.sensors().emit(TestEntity.MY_NOTIF, 1); + + // STOPPING and myname received, in subscription order, after everything else + entity.sensors().set(TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.STOPPING); + entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.SERVICE_STATE_ACTUAL, listener); + entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.NAME, listener); + + Asserts.succeedsEventually(() -> { + Asserts.assertEquals(listener.getEvents(), ImmutableList.of( + new BasicSensorEvent(TestEntity.SEQUENCE, entity, 123), + new BasicSensorEvent(TestEntity.SEQUENCE, entity, 456), + new BasicSensorEvent(TestEntity.MY_NOTIF, entity, 1), + new BasicSensorEvent(TestEntity.SERVICE_STATE_ACTUAL, entity, Lifecycle.STOPPING), + new BasicSensorEvent(TestEntity.NAME, entity, "myname")), + "actually got: "+listener.getEvents()); + }); + } + + @Test + public void testNotificationOrderMatchesSetValueOrderWhenSynched() { + RecordingSensorEventListener listener = new RecordingSensorEventListener<>(); + + AtomicInteger count = new AtomicInteger(); + Runnable set = () -> { + synchronized (count) { + entity.sensors().set(TestEntity.SEQUENCE, count.incrementAndGet()); + } + }; + entity.subscriptions().subscribe(ImmutableMap.of(), entity, TestEntity.SEQUENCE, listener); + for (int i=0; i<10; i++) { + new Thread(set).start(); + } + + Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), () -> { + Asserts.assertSize(listener.getEvents(), 10); }); + for (int i=0; i<10; i++) { + Assert.assertEquals(listener.getEvents().get(i).getValue(), i+1); + } + } + + @Test + public void testNotificationOrderMatchesSetValueOrderWhenNotSynched() { + RecordingSensorEventListener listener = new RecordingSensorEventListener<>(); + + AtomicInteger count = new AtomicInteger(); + Runnable set = () -> { + // as this is not synched, the sets may interleave + entity.sensors().set(TestEntity.SEQUENCE, count.incrementAndGet()); + }; + entity.subscriptions().subscribe(ImmutableMap.of(), entity, TestEntity.SEQUENCE, listener); + for (int i=0; i<10; i++) { + new Thread(set).start(); + } + + Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), () -> { + Asserts.assertSize(listener.getEvents(), 10); }); + // all we expect for sure is that the last value is whatever the sensor is at the end - internal update and publish is mutexed + Assert.assertEquals(listener.getEvents().get(9).getValue(), entity.sensors().get(TestEntity.SEQUENCE)); + } + } diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/persist/XmlMementoSerializerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/persist/XmlMementoSerializerTest.java index 59ea8f2f4a..88082ff9f6 100644 --- a/core/src/test/java/org/apache/brooklyn/core/mgmt/persist/XmlMementoSerializerTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/persist/XmlMementoSerializerTest.java @@ -611,7 +611,7 @@ public void testUntypedFieldReffingEntity() throws Exception { public void testTask() throws Exception { final TestApplication app = TestApplication.Factory.newManagedInstanceForTests(); mgmt = app.getManagementContext(); - Task completedTask = app.getExecutionContext().submit(Callables.returning("myval")); + Task completedTask = app.getExecutionContext().submit("return myval", Callables.returning("myval")); completedTask.get(); String loggerName = UnwantedStateLoggingMapper.class.getName(); diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedWithHaTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedWithHaTest.java index 5f534f99bf..85c99d5c04 100644 --- a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedWithHaTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindFeedWithHaTest.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.Callable; +import java.util.stream.Collectors; import org.apache.brooklyn.api.entity.EntitySpec; import org.apache.brooklyn.api.mgmt.Task; @@ -101,10 +102,11 @@ public void testHttpFeedCleansUpAfterHaDisabledAndRunsAtFailover() throws Except @Override public Boolean call() throws Exception { origManagementContext.getGarbageCollector().gcIteration(); - List> tasksAfter = ((BasicExecutionManager)origManagementContext.getExecutionManager()).getAllTasks(); + List> tasksAfter = removeSystemTasks( ((BasicExecutionManager)origManagementContext.getExecutionManager()).getAllTasks() ); log.info("tasks after disabling HA, "+tasksAfter.size()+": "+tasksAfter); return tasksAfter.isEmpty(); } + }).runRequiringTrue(); newManagementContext = createNewManagementContext(); @@ -122,6 +124,10 @@ public Boolean call() throws Exception { EntityAsserts.assertAttributeEqualsEventually(newEntity, SENSOR_STRING, "{\"foo\":\"myfoo\"}"); } + static List> removeSystemTasks(List> tasks) { + return tasks.stream().filter(t -> !("rebind".equals(t.getDisplayName()) || t.getDisplayName().contains("Validating"))).collect(Collectors.toList()); + } + @Test(groups="Integration", invocationCount=50) public void testHttpFeedCleansUpAfterHaDisabledAndRunsAtFailoverManyTimes() throws Exception { testHttpFeedCleansUpAfterHaDisabledAndRunsAtFailover(); diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerTest.java index 45589b10cf..25eb00f84b 100644 --- a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerTest.java @@ -20,11 +20,8 @@ import static org.testng.Assert.assertEquals; -import java.util.concurrent.Callable; - import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.EntitySpec; -import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.core.test.entity.TestEntity; import org.apache.brooklyn.core.test.entity.TestEntityImpl; import org.apache.brooklyn.util.core.task.BasicTask; @@ -48,15 +45,7 @@ public static class TestEntityWithTaskInRebind extends TestEntityImpl { @Override public void rebind() { super.rebind(); - Task task = new BasicTask(new Callable() { - @Override public String call() { - return "abc"; - }}); - String val = DynamicTasks.queueIfPossible(task) - .orSubmitAsync() - .asTask() - .getUnchecked(); - sensors().set(TestEntity.NAME, val); + sensors().set(TestEntity.NAME, DynamicTasks.get(new BasicTask(() -> "abc"))); } } } diff --git a/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java b/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java index 0f3310ec7c..4d733e66b2 100644 --- a/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java @@ -24,12 +24,15 @@ import org.apache.brooklyn.api.mgmt.SubscriptionHandle; import org.apache.brooklyn.api.policy.PolicySpec; import org.apache.brooklyn.core.entity.RecordingSensorEventListener; +import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; import org.apache.brooklyn.core.location.SimulatedLocation; import org.apache.brooklyn.core.policy.AbstractPolicy; import org.apache.brooklyn.core.sensor.BasicSensorEvent; import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; import org.apache.brooklyn.core.test.entity.TestEntity; import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.time.Duration; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -102,6 +105,7 @@ public void testUnsubscribeRemovesAllSubscriptionsForThatEntity() throws Excepti } @Test + @SuppressWarnings("unused") public void testUnsubscribeUsingHandleStopsEvents() throws Exception { SubscriptionHandle handle1 = policy.subscriptions().subscribe(entity, TestEntity.SEQUENCE, listener); SubscriptionHandle handle2 = policy.subscriptions().subscribe(entity, TestEntity.NAME, listener); @@ -122,18 +126,37 @@ public void testUnsubscribeUsingHandleStopsEvents() throws Exception { } @Test - public void testSubscriptionReceivesInitialValueEvents() { - entity.sensors().set(TestEntity.SEQUENCE, 123); + public void testSubscriptionReceivesInitialValueEventsInOrder() { entity.sensors().set(TestEntity.NAME, "myname"); - + entity.sensors().set(TestEntity.SEQUENCE, 123); + entity.sensors().emit(TestEntity.MY_NOTIF, -1); + + // delivery should be in subscription order, so 123 then 456 policy.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.SEQUENCE, listener); + // wait for the above delivery - otherwise it might get dropped + Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), () -> { + Asserts.assertSize(listener.getEvents(), 1); }); + entity.sensors().set(TestEntity.SEQUENCE, 456); + + // notifications don't have "initial value" so don't get -1 + policy.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.MY_NOTIF, listener); + // but do get 1, after 456 + entity.sensors().emit(TestEntity.MY_NOTIF, 1); + + // STOPPING and myname received, in subscription order, after everything else + entity.sensors().set(TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.STOPPING); + policy.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.SERVICE_STATE_ACTUAL, listener); policy.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.NAME, listener); - Asserts.succeedsEventually(new Runnable() { + Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), new Runnable() { @Override public void run() { assertEquals(listener.getEvents(), ImmutableList.of( new BasicSensorEvent(TestEntity.SEQUENCE, entity, 123), - new BasicSensorEvent(TestEntity.NAME, entity, "myname"))); + new BasicSensorEvent(TestEntity.SEQUENCE, entity, 456), + new BasicSensorEvent(TestEntity.MY_NOTIF, entity, 1), + new BasicSensorEvent(TestEntity.SERVICE_STATE_ACTUAL, entity, Lifecycle.STOPPING), + new BasicSensorEvent(TestEntity.NAME, entity, "myname")), + "actually got: "+listener.getEvents()); }}); } @@ -147,7 +170,7 @@ public void testSubscriptionNotReceivesInitialValueEventsByDefault() { Asserts.succeedsContinually(ImmutableMap.of("timeout", SHORT_WAIT_MS), new Runnable() { @Override public void run() { - assertEquals(listener.getEvents(), ImmutableList.of()); + Asserts.assertSize(listener.getEvents(), 0); }}); } diff --git a/core/src/test/java/org/apache/brooklyn/core/test/qa/performance/TaskPerformanceTest.java b/core/src/test/java/org/apache/brooklyn/core/test/qa/performance/TaskPerformanceTest.java index a7a531b432..949f1390f3 100644 --- a/core/src/test/java/org/apache/brooklyn/core/test/qa/performance/TaskPerformanceTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/test/qa/performance/TaskPerformanceTest.java @@ -74,11 +74,7 @@ public void run() { .summary("TaskPerformanceTest.testExecuteSimplestRunnable") .iterations(numIterations) .minAcceptablePerSecond(minRatePerSec) - .job(new Runnable() { - @Override - public void run() { - executionManager.submit(work); - }}) + .job(() -> executionManager.submit("inner", work)) .completionLatch(completionLatch)); } diff --git a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java index 80f1be65a4..0827664ba5 100644 --- a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java +++ b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java @@ -1325,12 +1325,8 @@ public void testFirstMemberInFirstBatchWhenMaxConcurrentCommandsSet() throws Exc .body(new Callable() { @Override public Boolean call() throws Exception { - Task first = DependentConfiguration.attributeWhenReady(cluster, DynamicCluster.FIRST); - DynamicTasks.queueIfPossible(first).orSubmitAsync(); - final Entity source = first.get(); - final Task booleanTask = DependentConfiguration.attributeWhenReady(source, Attributes.SERVICE_UP); - DynamicTasks.queueIfPossible(booleanTask).orSubmitAsync(); - return booleanTask.get(); + final Entity source = DynamicTasks.get( DependentConfiguration.attributeWhenReady(cluster, DynamicCluster.FIRST) ); + return DynamicTasks.get( DependentConfiguration.attributeWhenReady(source, Attributes.SERVICE_UP) ); } }) .build(); diff --git a/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationIntegrationTest.java b/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationIntegrationTest.java index 9ff39c182d..e648c32dcd 100644 --- a/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationIntegrationTest.java +++ b/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationIntegrationTest.java @@ -123,11 +123,7 @@ public void testGetMachineDetails() throws Exception { BasicExecutionManager execManager = new BasicExecutionManager("mycontextid"); BasicExecutionContext execContext = new BasicExecutionContext(execManager); try { - MachineDetails details = execContext.submit(new Callable() { - @Override - public MachineDetails call() { - return host.getMachineDetails(); - }}).get(); + MachineDetails details = execContext.submit("get details", () -> host.getMachineDetails()).get(); LOG.info("machineDetails="+details); assertNotNull(details); } finally { diff --git a/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationTest.java b/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationTest.java index 5c6e918e07..d25991c066 100644 --- a/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationTest.java +++ b/core/src/test/java/org/apache/brooklyn/location/ssh/SshMachineLocationTest.java @@ -131,11 +131,7 @@ public void testGetMachineDetails() throws Exception { BasicExecutionManager execManager = new BasicExecutionManager("mycontextid"); BasicExecutionContext execContext = new BasicExecutionContext(execManager); try { - MachineDetails details = execContext.submit(new Callable() { - @Override - public MachineDetails call() { - return host.getMachineDetails(); - }}).get(); + MachineDetails details = execContext.submit("get details", () -> host.getMachineDetails()).get(); LOG.info("machineDetails="+details); assertNotNull(details); @@ -166,11 +162,7 @@ public void testGetMachineDetailsWithExtraStdout() throws Exception { BasicExecutionManager execManager = new BasicExecutionManager("mycontextid"); BasicExecutionContext execContext = new BasicExecutionContext(execManager); try { - MachineDetails details = execContext.submit(new Callable() { - @Override - public MachineDetails call() { - return host.getMachineDetails(); - }}).get(); + MachineDetails details = execContext.submit("get details", () -> host.getMachineDetails()).get(); LOG.info("machineDetails="+details); assertNotNull(details); diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java index 90d6b06eba..2656c805ee 100644 --- a/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java +++ b/core/src/test/java/org/apache/brooklyn/util/core/task/TaskPredicatesTest.java @@ -82,14 +82,13 @@ public void testDisplayNameSatisfies() throws Exception { @Test public void testIsDone() throws Exception { CountDownLatch latch = new CountDownLatch(1); - Task task = app.getExecutionContext().submit(new Runnable() { - public void run() { + Task task = app.getExecutionContext().submit("await latch", () -> { try { latch.await(); } catch (InterruptedException e) { throw Exceptions.propagate(e); } - }}); + }); assertFalse(TaskPredicates.isDone().apply(task)); diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/TasksTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/TasksTest.java index 990e7f777d..8c656a34b9 100644 --- a/core/src/test/java/org/apache/brooklyn/util/core/task/TasksTest.java +++ b/core/src/test/java/org/apache/brooklyn/util/core/task/TasksTest.java @@ -248,8 +248,8 @@ public void run() { for (Object tag : Tasks.current().getTags()) { if (tag instanceof WrappedEntity) { WrappedEntity wrapped = (WrappedEntity)tag; - if (BrooklynTaskTags.CONTEXT_ENTITY.equals(wrapped.wrappingType)) { - context.add(wrapped.entity); + if (BrooklynTaskTags.CONTEXT_ENTITY.equals(wrapped.getWrappingType())) { + context.add(wrapped.unwrap()); } } } diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java index 1f5c754b13..455f8ad919 100644 --- a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java +++ b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -87,19 +88,17 @@ public void testUnsubmittedTaskWhenNoExecutionContextFails() { Assert.assertTrue(result.isAbsent(), "result="+result); Exception exception = Maybe.getException(result); - Assert.assertTrue(exception.toString().contains("no execution context available"), "exception="+exception); + Asserts.assertStringContains(exception.toString(), "no execution context available"); + + Asserts.assertThat(t, (tt) -> !tt.isBegun()); } public void testUnsubmittedTaskWithExecutionContextExecutesAndReturns() { final Task t = newSleepTask(Duration.ZERO, "foo"); // Below, we call ValueResolver.getMaybe() in app's execution context. Therefore it will execute the task. - Maybe result = app.getExecutionContext() - .submit(new Callable >() { - @Override - public Maybe call() throws Exception { - return Tasks.resolving(t).as(String.class).timeout(Asserts.DEFAULT_LONG_TIMEOUT).getMaybe(); - }}) + Maybe result = app.getExecutionContext() + .submit("resolving sleep task", () -> Tasks.resolving(t).as(String.class).timeout(Asserts.DEFAULT_LONG_TIMEOUT).getMaybe()) .getUnchecked(); Assert.assertEquals(result.get(), "foo"); @@ -110,17 +109,83 @@ public void testUnsubmittedTaskWithExecutionContextExecutesAndTimesOut() { // Below, we call ValueResolver.getMaybe() in app's execution context. Therefore it will execute the task. // However, it will quickly timeout as the task will not have completed. - Maybe result = app.getExecutionContext() - .submit(new Callable >() { - @Override - public Maybe call() throws Exception { - return Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe(); - }}) + Maybe result = app.getExecutionContext() + .submit("resolving sleep task", () -> Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe()) .getUnchecked(); Assert.assertTrue(result.isAbsent(), "result="+result); Exception exception = Maybe.getException(result); Assert.assertTrue(exception.toString().contains("not completed when immediate completion requested"), "exception="+exception); + + Asserts.eventually(() -> t, (tt) -> tt.isBegun(), Duration.TEN_SECONDS); + Asserts.assertThat(t, (tt) -> !tt.isDone()); + } + + public void testUnsubmittedTaskWithExecutionContextExecutesAndReturnsForeground() { + final Task t = newSleepTask(Duration.ZERO, "foo"); + + // Below, we call ValueResolver.getMaybe() in app's execution context. Therefore it will execute the task. + Maybe result = app.getExecutionContext() + .get(new BasicTask<>( () -> Tasks.resolving(t).as(String.class).timeout(Asserts.DEFAULT_LONG_TIMEOUT).getMaybe() )); + + Assert.assertEquals(result.get(), "foo"); + } + + public void testUnsubmittedTaskWithExecutionContextExecutesAndTimesOutForeground() { + final Task t = newSleepTask(Duration.ONE_MINUTE, "foo"); + + // Below, we call ValueResolver.getMaybe() in app's execution context. Therefore it will execute the task. + // However, it will quickly timeout as the task will not have completed. + Maybe result = app.getExecutionContext() + .get(new BasicTask<>( () -> Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe() )); + + Assert.assertTrue(result.isAbsent(), "result="+result); + Exception exception = Maybe.getException(result); + Assert.assertTrue(exception.toString().contains("not completed when immediate completion requested"), "exception="+exception); + + Asserts.eventually(() -> t, (tt) -> tt.isBegun(), Duration.TEN_SECONDS); + Asserts.assertThat(t, (tt) -> !tt.isDone()); + } + + public void testUnsubmittedTaskWithExecutionContextTimesOutWhenImmediate() { + final Task t = newSleepTask(Duration.ZERO, "foo"); + + // Below, we call ValueResolver.getMaybe() in app's execution context. Therefore it will execute the task + Maybe> result = app.getExecutionContext() + .getImmediately(new BasicTask<>( () -> Tasks.resolving(t).as(String.class).timeout(Asserts.DEFAULT_LONG_TIMEOUT).getMaybe() )); + + // However, the resubmission will not be waited upon + Assert.assertTrue(result.isPresent(), "result="+result); + Assert.assertTrue(result.get().isAbsent(), "result="+result); + Exception exception = Maybe.getException(result.get()); + + Asserts.assertStringContainsIgnoreCase(exception.toString(), "immediate", "not", "available"); + + // But the underlying task is running + Asserts.eventually(() -> t, (tt) -> tt.isBegun(), Duration.TEN_SECONDS); + Asserts.eventually(() -> t, (tt) -> tt.isDone(), Duration.TEN_SECONDS); + Asserts.assertThat(t, (tt) -> Objects.equals(tt.getUnchecked(), "foo")); + + // And subsequent get _is_ immediate + result = app.getExecutionContext() + .getImmediately(new BasicTask<>( () -> Tasks.resolving(t).as(String.class).timeout(Asserts.DEFAULT_LONG_TIMEOUT).getMaybe() )); + Assert.assertEquals(result.get().get(), "foo"); + } + + public void testUnsubmittedTaskWithExecutionContextExecutesAndTimesOutImmediate() { + final Task t = newSleepTask(Duration.ONE_MINUTE, "foo"); + + // Below, we call ValueResolver.getMaybe() in app's execution context. Therefore it will execute the task. + // However, it will quickly timeout as the task will not have completed. + Maybe> result = app.getExecutionContext() + .getImmediately(new BasicTask<>( () -> Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe() )); + + Assert.assertTrue(result.isPresent(), "result="+result); + Assert.assertTrue(result.get().isAbsent(), "result="+result); + Exception exception = Maybe.getException(result.get()); + Asserts.assertStringContainsIgnoreCase(exception.toString(), "immediate", "not", "available"); + Asserts.eventually(() -> t, (tt) -> tt.isBegun(), Duration.TEN_SECONDS); + Asserts.assertThat(t, (tt) -> !tt.isDone()); } public void testSwallowError() { @@ -161,7 +226,7 @@ public void testImmediateSupplierWithTimeoutUsesBlocking() { public void testGetImmediatelyInTask() throws Exception { final MyImmediateAndDeferredSupplier supplier = new MyImmediateAndDeferredSupplier(); - Task task = app.getExecutionContext().submit(new Callable() { + Task task = app.getExecutionContext().submit("test task for call stack", new Callable() { @Override public CallInfo call() { return myUniquelyNamedMethod(); diff --git a/locations/jclouds/src/main/java/org/apache/brooklyn/policy/jclouds/os/CreateUserPolicy.java b/locations/jclouds/src/main/java/org/apache/brooklyn/policy/jclouds/os/CreateUserPolicy.java index 2078779ab2..555d7cb866 100644 --- a/locations/jclouds/src/main/java/org/apache/brooklyn/policy/jclouds/os/CreateUserPolicy.java +++ b/locations/jclouds/src/main/java/org/apache/brooklyn/policy/jclouds/os/CreateUserPolicy.java @@ -110,11 +110,7 @@ public void onEvent(SensorEvent event) { } protected void addUserAsync(final Entity entity, final SshMachineLocation machine) { - ((EntityInternal)entity).getExecutionContext().execute(new Runnable() { - @Override - public void run() { - addUser(entity, machine); - }}); + getExecutionContext().execute(() -> addUser(entity, machine)); } protected void addUser(Entity entity, SshMachineLocation machine) { diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java index 840335a475..c6bd33d5fe 100644 --- a/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java +++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java @@ -21,7 +21,6 @@ import static org.apache.brooklyn.util.time.Time.makeTimeStringRounded; import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -30,19 +29,18 @@ import org.apache.brooklyn.api.sensor.Sensor; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.policy.AbstractPolicy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor; -import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.core.flags.SetFromFlag; import org.apache.brooklyn.util.core.task.BasicTask; import org.apache.brooklyn.util.core.task.ScheduledTask; +import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.time.Duration; import org.apache.brooklyn.util.time.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.reflect.TypeToken; @@ -194,7 +192,7 @@ public void resume() { protected void doStartPolling() { if (scheduledTask == null || scheduledTask.isDone()) { ScheduledTask task = ScheduledTask.builder(pollingTaskFactory).displayName( getTaskName() ).period(getPollPeriod()).build(); - scheduledTask = ((EntityInternal)entity).getExecutionContext().submit(task); + scheduledTask = getExecutionContext().submit(task); } } @@ -270,7 +268,6 @@ protected void schedulePublish() { schedulePublish(0); } - @SuppressWarnings("unchecked") protected void schedulePublish(long delay) { if (isRunning() && executorQueued.compareAndSet(false, true)) { long now = System.currentTimeMillis(); @@ -279,8 +276,9 @@ protected void schedulePublish(long delay) { Runnable job = new PublishJob(); - ScheduledTask task = new ScheduledTask(MutableMap.of("delay", Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask(job)); - ((EntityInternal)entity).getExecutionContext().submit(task); + ScheduledTask task = ScheduledTask.builder(() -> Tasks.builder().body(job).dynamic(false).displayName("Failure detector iteration").build()) + .delay(Duration.millis(delay)).displayName("Failure detector scheduler").build(); + getExecutionContext().submit(task); } } diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java index 2cbbf28e42..e1435825c4 100644 --- a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java +++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java @@ -39,6 +39,7 @@ import org.apache.brooklyn.util.core.flags.SetFromFlag; import org.apache.brooklyn.util.core.task.BasicTask; import org.apache.brooklyn.util.core.task.ScheduledTask; +import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.guava.Maybe; import org.apache.brooklyn.util.time.Duration; @@ -314,7 +315,6 @@ private String getFailureDescription(long now) { return description; } - @SuppressWarnings({ "rawtypes" }) protected void recomputeAfterDelay(long delay) { // TODO Execute in same thread as other onEvent calls are done in (i.e. same conceptually // single-threaded executor as the subscription-manager will use). @@ -352,8 +352,9 @@ protected void recomputeAfterDelay(long delay) { } }; - ScheduledTask task = new ScheduledTask(MutableMap.of("delay", Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask(job)); - ((EntityInternal)entity).getExecutionContext().submit(task); + ScheduledTask task = ScheduledTask.builder(() -> Tasks.builder().body(job).dynamic(false).displayName("Failure detector recompute").build()) + .delay(Duration.millis(delay)).displayName("Failure detector recompute after delay").build(); + getExecutionContext().submit(task); } private String getTimeStringSince(Long time) { diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java index 3a1ba800c3..c79933688d 100644 --- a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java +++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceReplacer.java @@ -129,10 +129,7 @@ public void setEntity(final EntityLocal entity) { if (isRunning()) { highlightViolation("Failure detected"); LOG.warn("ServiceReplacer notified; dispatching job for "+entity+" ("+event.getValue()+")"); - ((EntityInternal)entity).getExecutionContext().submit(MutableMap.of(), new Runnable() { - @Override public void run() { - onDetectedFailure(event); - }}); + getExecutionContext().submit("Analyzing detected failure", () -> onDetectedFailure(event)); } else { LOG.warn("ServiceReplacer not running, so not acting on failure detected at "+entity+" ("+event.getValue()+", child of "+entity+")"); } @@ -174,12 +171,9 @@ protected synchronized void onDetectedFailure(SensorEvent event) { return; } - highlightViolation(violationText+", triggering restart"); + highlightViolation(violationText+", triggering replacement"); LOG.warn("ServiceReplacer acting on failure detected at "+failedEntity+" ("+reason+", child of "+entity+")"); - Task t = ((EntityInternal)entity).getExecutionContext().submit(MutableMap.of(), new Runnable() { - - @Override - public void run() { + Task t = getExecutionContext().submit("Replace member on failure", () -> { try { Entities.invokeEffectorWithArgs(entity, entity, MemberReplaceable.REPLACE_MEMBER, failedEntity.getId()).get(); consecutiveReplacementFailureTimes.clear(); @@ -191,8 +185,7 @@ public void run() { highlightViolation(violationText+" and replace attempt failed: "+Exceptions.collapseText(e)); onReplacementFailed("Replace failure ("+Exceptions.collapseText(e)+") at "+entity+": "+reason); } - } - }); + }); highlightAction("Replacing "+failedEntity, t); } diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java index 8faaf89995..3c0341e3ac 100644 --- a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java +++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceRestarter.java @@ -29,14 +29,12 @@ import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.entity.Entities; -import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic; import org.apache.brooklyn.core.entity.trait.Startable; import org.apache.brooklyn.core.policy.AbstractPolicy; import org.apache.brooklyn.core.sensor.BasicNotificationSensor; import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor; -import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.core.flags.SetFromFlag; import org.apache.brooklyn.util.javalang.JavaClassNames; import org.apache.brooklyn.util.time.Duration; @@ -112,10 +110,7 @@ public void setEntity(final EntityLocal entity) { if (isRunning()) { LOG.info("ServiceRestarter notified; dispatching job for "+entity+" ("+event.getValue()+")"); - ((EntityInternal)entity).getExecutionContext().submit(MutableMap.of(), new Runnable() { - @Override public void run() { - onDetectedFailure(event); - }}); + getExecutionContext().submit("Analyzing detected failure", () -> onDetectedFailure(event)); } else { LOG.warn("ServiceRestarter not running, so not acting on failure detected at "+entity+" ("+event.getValue()+")"); } diff --git a/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/resources/SensorResourceTest.java b/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/resources/SensorResourceTest.java index 391f2bb3bc..e5fc5b236d 100644 --- a/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/resources/SensorResourceTest.java +++ b/rest/rest-resources/src/test/java/org/apache/brooklyn/rest/resources/SensorResourceTest.java @@ -325,7 +325,7 @@ public void testGetSensorValueOfTypeFeed() throws Exception { @Test public void testGetSensorValueOfTypeCompletedTask() throws Exception { - Task task = entity.getExecutionContext().submit(Callables.returning("myval")); + Task task = entity.getExecutionContext().submit("returning myval", Callables.returning("myval")); task.get(); entity.sensors().set(Sensors.newSensor(Task.class, "myTask"), task); doGetSensorTest("myTask", String.class, "\"myval\""); diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java index 54c81d1e8e..d6e3fce236 100644 --- a/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java +++ b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java @@ -50,32 +50,32 @@ import org.apache.brooklyn.entity.brooklynnode.effector.BrooklynNodeUpgradeEffectorBody; import org.apache.brooklyn.entity.brooklynnode.effector.SetHighAvailabilityModeEffectorBody; import org.apache.brooklyn.entity.brooklynnode.effector.SetHighAvailabilityPriorityEffectorBody; -import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl; import org.apache.brooklyn.entity.software.base.SoftwareProcess.StopSoftwareParameters.StopMode; +import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl; import org.apache.brooklyn.entity.software.base.lifecycle.MachineLifecycleEffectorTasks; import org.apache.brooklyn.feed.http.HttpFeed; import org.apache.brooklyn.feed.http.HttpPollConfig; import org.apache.brooklyn.feed.http.HttpValueFunctions; import org.apache.brooklyn.feed.http.JsonFunctions; -import org.apache.http.HttpStatus; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.brooklyn.util.collections.Jsonya; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.core.config.ConfigBag; -import org.apache.brooklyn.util.http.HttpToolResponse; import org.apache.brooklyn.util.core.task.DynamicTasks; import org.apache.brooklyn.util.core.task.TaskTags; import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.exceptions.PropagatedRuntimeException; import org.apache.brooklyn.util.guava.Functionals; +import org.apache.brooklyn.util.http.HttpToolResponse; import org.apache.brooklyn.util.javalang.Enums; import org.apache.brooklyn.util.javalang.JavaClassNames; import org.apache.brooklyn.util.repeat.Repeater; import org.apache.brooklyn.util.text.Strings; import org.apache.brooklyn.util.time.Duration; import org.apache.brooklyn.util.time.Time; +import org.apache.http.HttpStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Functions; @@ -222,7 +222,7 @@ protected void postStop() { // we could wait for BrooklynTaskTags.getTasksInEntityContext(ExecutionManager, this).isEmpty(); Task stopEffectorTask = BrooklynTaskTags.getClosestEffectorTask(Tasks.current(), Startable.STOP); Task topEntityTask = getTopEntityTask(stopEffectorTask); - getManagementContext().getExecutionManager().submit(new UnmanageTask(topEntityTask, this)); + getManagementContext().getExecutionManager().submit("Unmanage Brooklyn entity after stop", new UnmanageTask(topEntityTask, this)); } } diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SameServerDriverLifecycleEffectorTasks.java b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SameServerDriverLifecycleEffectorTasks.java index faed3dbdc8..52ce82fb42 100644 --- a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SameServerDriverLifecycleEffectorTasks.java +++ b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SameServerDriverLifecycleEffectorTasks.java @@ -111,10 +111,7 @@ protected Map obtainProvisioningFlags(Entity entity, MachineProvi @Override protected String startProcessesAtMachine(Supplier machineS) { - DynamicTasks.queueIfPossible(StartableMethods.startingChildren(entity(), machineS.get())) - .orSubmitAsync(entity()) - .getTask() - .getUnchecked(); + DynamicTasks.get(StartableMethods.startingChildren(entity(), machineS.get()), entity()); DynamicTasks.waitForLast(); return "children started"; } diff --git a/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java b/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java index d5d9751519..55b273ee74 100644 --- a/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java +++ b/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java @@ -211,14 +211,14 @@ protected void preStart() { String command = JOINER_ON_SPACE.join(allParams); log.debug("Windows performance counter poll command for {} will be: {}", entity, command); - GetPerformanceCountersJob job = new GetPerformanceCountersJob(getEntity(), command); + GetPerformanceCountersJob job = new GetPerformanceCountersJob(getEntity(), command); getPoller().scheduleAtFixedRate( - new CallInEntityExecutionContext(entity, job), + new CallInExecutionContext(this, job), new SendPerfCountersToSensors(getEntity(), polls), minPeriod); } - private static class GetPerformanceCountersJob implements Callable { + private static class GetPerformanceCountersJob implements Callable { private final Entity entity; private final String command; @@ -229,15 +229,14 @@ private static class GetPerformanceCountersJob implements Callable { } @Override - @SuppressWarnings("unchecked") - public T call() throws Exception { + public WinRmToolResponse call() throws Exception { Maybe machineLocationMaybe = Machines.findUniqueMachineLocation(entity.getLocations(), WinRmMachineLocation.class); if (machineLocationMaybe.isAbsent()) { return null; } WinRmMachineLocation machine = EffectorTasks.getMachine(entity, WinRmMachineLocation.class); WinRmToolResponse response = machine.executePsScript(command); - return (T)response; + return response; } } @@ -254,18 +253,18 @@ protected Poller getPoller() { * * @param The type of the {@link java.util.concurrent.Callable}. */ - private static class CallInEntityExecutionContext implements Callable { + private static class CallInExecutionContext implements Callable { private final Callable job; - private Entity entity; + private AbstractFeed feed; - private CallInEntityExecutionContext(Entity entity, Callable job) { + private CallInExecutionContext(AbstractFeed feed, Callable job) { this.job = job; - this.entity = entity; + this.feed = feed; } @Override public T call() throws Exception { - ExecutionContext executionContext = ((EntityInternal) entity).getExecutionContext(); + ExecutionContext executionContext = feed.getExecutionContext(); return executionContext.submit(Maps.newHashMap(), job).get(); } } diff --git a/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java b/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java index dc672b5674..3cd70b1087 100644 --- a/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java +++ b/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java @@ -1452,8 +1452,24 @@ public static void eventuallyOnNotify(T object, Predicate predicate) { } public static void assertSize(Iterable list, int expectedSize) { - if (list==null) fail("List is null"); - if (Iterables.size(list)!=expectedSize) fail("List has wrong size "+Iterables.size(list)+" (expected "+expectedSize+"): "+list); + if (list==null) fail("Collection is null"); + if (Iterables.size(list)!=expectedSize) fail("Collection has wrong size "+Iterables.size(list)+" (expected "+expectedSize+"): "+list); + } + + public static void assertSize(Map map, int expectedSize) { + if (map==null) fail("Map is null"); + if (Iterables.size(map.keySet())!=expectedSize) fail("Map has wrong size "+map.size()+" (expected "+expectedSize+"): "+map); + } + + /** Ignores duplicates and order */ + public static void assertSameUnorderedContents(Iterable i1, Iterable i2) { + if (i1==null || i2==null) { + if (i1==null && i2==null) { + return ; + } + fail("Collections differ in that one is null: "+i1+" and "+i2); + } + assertEquals(MutableSet.copyOf(i1), MutableSet.copyOf(i2)); } public static void assertInstanceOf(Object obj, Class type) { diff --git a/utils/common/src/test/java/org/apache/brooklyn/test/AssertsTest.java b/utils/common/src/test/java/org/apache/brooklyn/test/AssertsTest.java index bd411b707b..1105acd286 100644 --- a/utils/common/src/test/java/org/apache/brooklyn/test/AssertsTest.java +++ b/utils/common/src/test/java/org/apache/brooklyn/test/AssertsTest.java @@ -25,7 +25,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.brooklyn.test.Asserts.ShouldHaveFailedPreviouslyAssertionError; +import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.collections.MutableSet; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.time.Duration; import org.testng.Assert; @@ -170,4 +172,19 @@ public void testShouldHaveFailed() { // check code flowed the way we expected Asserts.assertEquals(reached, 3); } + + @Test + public void testAssertSize() { + Asserts.assertSize(MutableList.of("x", "x", "y"), 3); + Asserts.assertSize(MutableSet.of("x", "x", "y"), 2); + Asserts.assertSize(MutableMap.of("x", "x", "y", "y"), 2); + } + + @Test + public void testAssertSetListEqualityAndSameUnoderderedContents() { + Assert.assertEquals(MutableSet.of("x", "x", "y"), MutableSet.of("x", "y", "x")); + Assert.assertNotEquals(MutableList.of("x", "x", "y"), MutableList.of("x", "y", "x")); + // above are baseline checks + Asserts.assertSameUnorderedContents(MutableList.of("x", "x", "y"), MutableList.of("y", "x")); + } }