Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
37b6b11
task visibility: validation of config
ahgittin Sep 12, 2017
7f4d7bd
task visibility: entity mgmt create and startup wrapped in its own task
ahgittin Sep 12, 2017
4430f76
task optimization: some queued-or-submitted tasks use foreground for …
ahgittin Sep 15, 2017
8ecf395
task visibility: better names for config retrieval tasks
ahgittin Sep 15, 2017
d4c9fe1
entity adjuncts have extra tag for execution context, used in subscri…
ahgittin Sep 19, 2017
84d24d1
fix visibility: entity init runs in same thread
ahgittin Sep 19, 2017
0a1acec
fix deadlock in initial publication of sensors on setting up a subscr…
ahgittin Sep 19, 2017
e1f948a
task visibility: entity initialization
ahgittin Sep 19, 2017
b0556de
include adjunct info as a subscription description
ahgittin Sep 19, 2017
aeecd3e
task GC and visibility: tidy GC code, don't delete some things eg sub…
ahgittin Sep 20, 2017
79cc9bc
task visibility: ensure all tasks have a name, updating exec.submit()…
ahgittin Sep 20, 2017
130a29b
fix tests that asserted specific tasks (as there are now more)
ahgittin Sep 20, 2017
80446ab
Merge branch 'master' into tasks-better-tree
ahgittin Sep 25, 2017
6dfe498
deprecated since is now 0.13.0 not 0.12.0
ahgittin Sep 25, 2017
d7b086b
Merge branch 'tasks-better' into tasks-better-tree
ahgittin Sep 28, 2017
bb26d32
change when cancellation is done for getImmediate - means effector in…
ahgittin Sep 28, 2017
0c2e1f6
Merge branch 'master' into tasks-4
ahgittin Oct 4, 2017
9888870
switch queue-or-submit-blocking-then-get invocations to new simpler D…
ahgittin Oct 2, 2017
8b72769
Tasks.tryQueueing won't queue if calling thread is interrupted
ahgittin Oct 4, 2017
508183b
more assertion routines, map equality and unordered iterable equality
ahgittin Oct 4, 2017
9213f0e
fix message publish synching to guarantee in-order delivery
ahgittin Oct 4, 2017
2dcb0a0
address review comments
ahgittin Oct 6, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,22 @@ public interface ExecutionContext extends Executor {
*/
<T> Task<T> submit(Map<?,?> properties, Callable<T> callable);

/** {@link ExecutionManager#submit(Runnable) */
/** {@link ExecutionManager#submit(Runnable)
* @deprecated since 0.13.0 pass a display name or a more detailed map */
@Deprecated
Task<?> submit(Runnable runnable);

/** {@link ExecutionManager#submit(Callable) */
/** {@link ExecutionManager#submit(Callable)
* @deprecated since 0.13.0 pass a display name or a more detailed map */
@Deprecated
<T> Task<T> submit(Callable<T> callable);

/** {@link ExecutionManager#submit(String, Runnable) */
Task<?> submit(String displayName, Runnable runnable);

/** {@link ExecutionManager#submit(String, Callable) */
<T> Task<T> submit(String displayName, Callable<T> callable);

/** See {@link ExecutionManager#submit(Map, TaskAdaptable)}. */
<T> Task<T> submit(TaskAdaptable<T> task);

Expand Down Expand Up @@ -86,18 +96,23 @@ public interface ExecutionContext extends Executor {
// TODO reference ImmediateSupplier when that class is moved to utils project
@Beta
<T> Maybe<T> getImmediately(Object callableOrSupplierOrTask);
/** As {@link #getImmediately(Object)} but strongly typed for a task. */
@Beta
<T> Maybe<T> getImmediately(Task<T> callableOrSupplierOrTask);

/**
* Efficient shortcut for {@link #submit(TaskAdaptable)} followed by an immediate {@link Task#get()}.
* Efficient implementation of common case when {@link #submit(TaskAdaptable)} is followed by an immediate {@link Task#get()}.
* <p>
* Implementations will typically attempt to execute in the current thread, with appropriate
* configuration to make it look like it is in a sub-thread,
* ie registering this as a task and allowing
* context methods on tasks to see the given sub-task.
* This is efficient in that it may typically attempt to execute in the current thread,
* with appropriate configuration to make it look like it is in a sub-thread,
* ie registering this as a task and allowing context methods on tasks to see the given sub-task.
* However it will normally be non-blocking which reduces overhead and
* is permissible within a {@link #getImmediately(Object)} task
* <p>
* If the argument has already been submitted it simply blocks on it.
* If the argument has already been submitted it simply blocks on it
* (i.e. no additional execution, and in that case would fail within a {@link #getImmediately(Object)}).
*
* @param task
* @param task the task whose result is being sought
* @return result of the task execution
*/
@Beta
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,21 @@ public interface ExecutionManager {
// /** returns all tasks known to this manager (immutable) */
// public Set<Task<?>> getAllTasks();

/** see {@link #submit(Map, TaskAdaptable)} */
/** see {@link #submit(Map, TaskAdaptable)}
* @deprecated since 0.13.0 pass displayName or map */
@Deprecated
public Task<?> submit(Runnable r);

/** see {@link #submit(Map, TaskAdaptable)}
* @deprecated since 0.13.0 pass displayName or map */
@Deprecated
public <T> Task<T> submit(Callable<T> r);

/** see {@link #submit(Map, TaskAdaptable)} */
public Task<?> submit(String displayName, Runnable c);

/** see {@link #submit(Map, TaskAdaptable)} */
public <T> Task<T> submit(Callable<T> c);
public <T> Task<T> submit(String displayName, Callable<T> c);

/** see {@link #submit(Map, TaskAdaptable)} */
public <T> Task<T> submit(TaskAdaptable<T> task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityManager;
import org.apache.brooklyn.api.mgmt.rebind.RebindManager;
import org.apache.brooklyn.api.objs.BrooklynObject;
import org.apache.brooklyn.api.objs.EntityAdjunct;
import org.apache.brooklyn.api.typereg.BrooklynTypeRegistry;
import org.apache.brooklyn.config.StringConfigMap;
import org.apache.brooklyn.util.guava.Maybe;
Expand Down Expand Up @@ -165,6 +166,9 @@ public interface ManagementContext {
*/
ExecutionContext getExecutionContext(Entity entity);

/** As {@link #getExecutionContext(Entity)} where there is also an adjunct */
ExecutionContext getExecutionContext(Entity e, EntityAdjunct a);

/**
* Returns a {@link SubscriptionContext} instance representing subscriptions
* (from the {@link SubscriptionManager}) associated with this entity, and capable
Expand All @@ -176,6 +180,9 @@ public interface ManagementContext {
*/
SubscriptionContext getSubscriptionContext(Entity entity);

/** As {@link #getSubscriptionContext(Entity)} where there is also an adjunct */
SubscriptionContext getSubscriptionContext(Entity e, EntityAdjunct a);

/**
* Returns a {@link SubscriptionContext} instance representing subscriptions
* (from the {@link SubscriptionManager}) associated with this location, and capable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ public interface SubscriptionManager {
*
* The method returns an id which can be used to {@link #unsubscribe(SubscriptionHandle)} later.
* <p>
* The listener callback is in-order single-threaded and synchronized on this object. The flags
* parameters can include the following:
* The listener callback is in-order single-threaded and synchronized on this object.
* In other words message delivery from a producer to a given subscriber is in publish order
* (or in the case of a late subscriber getting initial values, in subscribe order).
* The flags parameters can include the following:
* <ul>
* <li>subscriber - object to identify the subscriber (e.g. entity, or console session uid)
* <li><i>in future</i> - control parameters for the subscription (period, minimum delta for updates, etc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -934,13 +934,12 @@ public void testManuallyAddInTaskOfOtherEntity() throws Exception {
Entity app = createStartWaitAndLogApplication(yaml);
final TestEntity entity1 = (TestEntity) Iterables.getOnlyElement(app.getChildren());

TestEntity entity2 = entity1.getExecutionContext().submit(new Callable<TestEntity>() {
public TestEntity call() {
TestEntity entity2 = entity1.addChild(EntitySpec.create(TestEntity.class));
entity2.start(Collections.<Location>emptyList());
return entity2;
}
}).get();
TestEntity entity2 = entity1.getExecutionContext().submit("create and start", () -> {
TestEntity entity2i = entity1.addChild(EntitySpec.create(TestEntity.class));
entity2i.start(Collections.<Location>emptyList());
return entity2i;
})
.get();

Entities.dumpInfo(app);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ public Entity call() throws Exception {
protected void runConcurrentWorker(Supplier<Runnable> taskSupplier) {
Collection<Task<?>> results = new ArrayList<>();
for (int i = 0; i < MAX_PARALLEL_RESOLVERS; i++) {
Task<?> result = app.getExecutionContext().submit(taskSupplier.get());
Task<?> result = app.getExecutionContext().submit("parallel "+i, taskSupplier.get());
results.add(result);
}
for (Task<?> result : results) {
Expand Down Expand Up @@ -550,7 +550,7 @@ public Maybe<?> call() throws Exception {
}
};
if (execInTask) {
Task<Maybe<?>> task = ((EntityInternal)context).getExecutionContext().submit(job);
Task<Maybe<?>> task = ((EntityInternal)context).getExecutionContext().submit("Resolving DSL for test: "+dsl, job);
task.get(Asserts.DEFAULT_LONG_TIMEOUT);
assertTrue(task.isDone());
return task.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@

import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.api.mgmt.ExecutionContext;
import org.apache.brooklyn.api.objs.BrooklynObject;
import org.apache.brooklyn.api.objs.EntityAdjunct;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.objs.AbstractEntityAdjunct;
import org.apache.brooklyn.core.objs.BrooklynObjectInternal;
import org.apache.brooklyn.core.objs.BrooklynObjectPredicate;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.guava.Maybe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -112,12 +115,21 @@ public ConfigConstraints(T brooklynObject) {
abstract Iterable<ConfigKey<?>> getBrooklynObjectTypeConfigKeys();

public Iterable<ConfigKey<?>> getViolations() {
// TODO in new task
return validateAll();
ExecutionContext exec =
getBrooklynObject() instanceof EntityInternal ? ((EntityInternal)getBrooklynObject()).getExecutionContext() :
getBrooklynObject() instanceof AbstractEntityAdjunct ? ((AbstractEntityAdjunct)getBrooklynObject()).getExecutionContext() :
null;
if (exec!=null) {
return exec.get(
Tasks.<Iterable<ConfigKey<?>>>builder().dynamic(false).displayName("Validating config").body(
() -> validateAll() ).build() );
} else {
return validateAll();
}
}

@SuppressWarnings("unchecked")
private Iterable<ConfigKey<?>> validateAll() {
protected Iterable<ConfigKey<?>> validateAll() {
List<ConfigKey<?>> violating = Lists.newLinkedList();
Iterable<ConfigKey<?>> configKeys = getBrooklynObjectTypeConfigKeys();
LOG.trace("Checking config keys on {}: {}", getBrooklynObject(), configKeys);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ public void setManagementContext(ManagementContextInternal managementContext) {
* through this method. Internally, all attribute updates synch on this object. Code wishing to
* update attributes or publish while holding some other lock should acquire the monitor on this
* object first to prevent deadlock. */
@Beta
protected Object getAttributesSynchObjectInternal() {
return attributesInternal.getSynchObjectInternal();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,20 @@ private StartableMethods() {}
/** Common implementation for start in parent nodes; just invokes start on all children of the entity */
public static void start(Entity e, Collection<? extends Location> locations) {
log.debug("Starting entity "+e+" at "+locations);
DynamicTasks.queueIfPossible(startingChildren(e, locations)).orSubmitAsync(e).getTask().getUnchecked();
DynamicTasks.get(startingChildren(e, locations), e);
}

/** Common implementation for stop in parent nodes; just invokes stop on all children of the entity */
public static void stop(Entity e) {
log.debug("Stopping entity "+e);
DynamicTasks.queueIfPossible(stoppingChildren(e)).orSubmitAsync(e).getTask().getUnchecked();
DynamicTasks.get(stoppingChildren(e), e);
if (log.isDebugEnabled()) log.debug("Stopped entity "+e);
}

/** Common implementation for restart in parent nodes; just invokes restart on all children of the entity */
public static void restart(Entity e) {
log.debug("Restarting entity "+e);
DynamicTasks.queueIfPossible(restartingChildren(e)).orSubmitAsync(e).getTask().getUnchecked();
DynamicTasks.get(restartingChildren(e), e);
if (log.isDebugEnabled()) log.debug("Restarted entity "+e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void start() {

for (final Callable<?> oneOffJob : oneOffJobs) {
Task<?> task = Tasks.builder().dynamic(false).body((Callable<Object>) oneOffJob).displayName("Poll").description("One-time poll job "+oneOffJob).build();
oneOffTasks.add(((EntityInternal)entity).getExecutionContext().submit(task));
oneOffTasks.add(feed.getExecutionContext().submit(task));
}

Duration minPeriod = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,7 @@ public String toString() {
*/
@Beta
public static BasicMachineDetails forSshMachineLocationLive(SshMachineLocation location) {
return TaskTags.markInessential(DynamicTasks.queueIfPossible(taskForSshMachineLocation(location))
.orSubmitAsync()
.asTask())
.getUnchecked();
return DynamicTasks.get(TaskTags.markInessential(taskForSshMachineLocation(location)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public static HostAndPort getBrooklynAccessibleAddress(Entity entity, int port)
public static String getResolvedAddress(Entity entity, SshMachineLocation origin, String hostnameTarget) {
ProcessTaskWrapper<Integer> task = SshTasks.newSshExecTaskFactory(origin, "ping -c 1 -t 1 "+hostnameTarget)
.summary("checking resolution of "+hostnameTarget).allowingNonZeroExitCode().newTask();
DynamicTasks.queueIfPossible(task).orSubmitAndBlock(entity).asTask().blockUntilEnded();
DynamicTasks.queueIfPossible(task).orSubmitAndBlock(entity).getTask().blockUntilEnded();
if (task.asTask().isError()) {
log.warn("ping could not be run, at "+entity+" / "+origin+": "+Tasks.getError(task.asTask()));
return "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.brooklyn.api.mgmt.ManagementContext;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.mgmt.entitlement.EntitlementContext;
import org.apache.brooklyn.api.objs.EntityAdjunct;
import org.apache.brooklyn.core.mgmt.internal.AbstractManagementContext;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.task.DynamicTasks;
Expand All @@ -59,13 +60,14 @@
/** Provides utilities for making Tasks easier to work with in Brooklyn.
* Main thing at present is to supply (and find) wrapped entities for tasks to understand the
* relationship of the entity to the task.
* TODO Longer term it would be better to remove 'tags' on Tasks and use a strongly typed context object.
* <p>
* Eventually it may be better to replace these 'tags' on Tasks with strongly typed context objects.
* (Tags there are used mainly for determining who called it (caller), what they called it on (target entity),
* and what type of task it is (effector, schedule/sensor, etc).)
*/
public class BrooklynTaskTags extends TaskTags {

private static final Logger log = LoggerFactory.getLogger(BrooklynTaskTags.WrappedEntity.class);
private static final Logger log = LoggerFactory.getLogger(BrooklynTaskTags.class);

/** Tag for tasks which are running on behalf of the management server, rather than any entity */
public static final String BROOKLYN_SERVER_TASK_TAG = "BROOKLYN-SERVER";
Expand All @@ -85,37 +87,66 @@ public class BrooklynTaskTags extends TaskTags {

// ------------- entity tags -------------------------

public static class WrappedEntity {
public abstract static class WrappedItem<T> {
/** @deprecated since 0.13.0 going private; use {@link #getWrappingType()} */
@Deprecated
public final String wrappingType;
public final Entity entity;
protected WrappedEntity(String wrappingType, Entity entity) {
protected WrappedItem(String wrappingType) {
Preconditions.checkNotNull(wrappingType);
Preconditions.checkNotNull(entity);
this.wrappingType = wrappingType;
this.entity = entity;
}
public abstract T unwrap();
public String getWrappingType() {
return wrappingType;
}
@Override
public String toString() {
return "Wrapped["+wrappingType+":"+entity+"]";
return "Wrapped["+getWrappingType()+":"+unwrap()+"]";
}
@Override
public int hashCode() {
return Objects.hashCode(entity, wrappingType);
return Objects.hashCode(unwrap(), getWrappingType());
}
@Override
public boolean equals(Object obj) {
if (this==obj) return true;
if (!(obj instanceof WrappedEntity)) return false;
if (!(obj instanceof WrappedItem)) return false;
return
Objects.equal(entity, ((WrappedEntity)obj).entity) &&
Objects.equal(wrappingType, ((WrappedEntity)obj).wrappingType);
Objects.equal(unwrap(), ((WrappedItem<?>)obj).unwrap()) &&
Objects.equal(getWrappingType(), ((WrappedItem<?>)obj).getWrappingType());
}
}
public static class WrappedEntity extends WrappedItem<Entity> {
/** @deprecated since 0.13.0 going private; use {@link #unwrap()} */
@Deprecated
public final Entity entity;
protected WrappedEntity(String wrappingType, Entity entity) {
super(wrappingType);
this.entity = Preconditions.checkNotNull(entity);
}
@Override
public Entity unwrap() {
return entity;
}
}
public static class WrappedObject<T> extends WrappedItem<T> {
private final T object;
protected WrappedObject(String wrappingType, T object) {
super(wrappingType);
this.object = Preconditions.checkNotNull(object);
}
@Override
public T unwrap() {
return object;
}
}

public static final String CONTEXT_ENTITY = "contextEntity";
public static final String CALLER_ENTITY = "callerEntity";
public static final String TARGET_ENTITY = "targetEntity";

public static final String CONTEXT_ADJUNCT = "contextAdjunct";

/**
* Marks a task as running in the context of the entity. This means
* resolving any relative/context sensitive values against that entity.
Expand All @@ -138,6 +169,15 @@ public static WrappedEntity tagForTargetEntity(Entity entity) {
return new WrappedEntity(TARGET_ENTITY, entity);
}

/**
* As {@link #tagForContextEntity(Entity)} but wrapping an adjunct.
* Tasks with this tag will also have a {@link #tagForContextEntity(Entity)}.
*/
public static WrappedObject<EntityAdjunct> tagForContextAdjunct(EntityAdjunct adjunct) {
return new WrappedObject<EntityAdjunct>(CONTEXT_ADJUNCT, adjunct);
}


public static WrappedEntity getWrappedEntityTagOfType(Task<?> t, String wrappingType) {
if (t==null) return null;
return getWrappedEntityTagOfType( getTagsFast(t), wrappingType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,11 @@ public static <T extends Application> T createUnstarted(ManagementContext mgmt,
*/
@Beta
public static <T extends Application> T createUnstarted(ManagementContext mgmt, EntitySpec<T> spec, Optional<String> entityId) {
// TODO wrap in task
T app = ((EntityManagerInternal)mgmt.getEntityManager()).createEntity(spec, entityId);
return app;
return mgmt.getServerExecutionContext().get(Tasks.<T>builder().dynamic(false)
.displayName("Creating entity "+
(Strings.isNonBlank(spec.getDisplayName()) ? spec.getDisplayName() : spec.getType().getName()) )
.body(() -> ((EntityManagerInternal)mgmt.getEntityManager()).createEntity(spec, entityId))
.build() );
}

/** as {@link #createUnstarted(ManagementContext, EntitySpec)} but for a string plan (e.g. camp yaml) */
Expand Down
Loading