From 00deb7df05b52cc5168e6f56f3cc66739f11009c Mon Sep 17 00:00:00 2001 From: Andrew Donald Kennedy Date: Wed, 13 Sep 2017 18:03:56 +0100 Subject: [PATCH 1/2] New periodic and scheduled effector policies --- .../AbstractScheduledEffectorPolicy.java | 220 ++++++++++++++++++ .../policy/action/PeriodicEffectorPolicy.java | 102 ++++++++ .../action/ScheduledEffectorPolicy.java | 147 ++++++++++++ policy/src/main/resources/catalog.bom | 15 ++ .../action/PeriodicEffectorPolicyTest.java | 98 ++++++++ .../action/ScheduledEffectorPolicyTest.java | 116 +++++++++ 6 files changed, 698 insertions(+) create mode 100644 policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java create mode 100644 policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java create mode 100644 policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java create mode 100644 policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java create mode 100644 policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java new file mode 100644 index 0000000000..73438c6c3d --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.action; + +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.brooklyn.api.effector.Effector; +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.api.sensor.SensorEventListener; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.entity.EntityInitializers; +import org.apache.brooklyn.core.entity.trait.Startable; +import org.apache.brooklyn.core.policy.AbstractPolicy; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.config.ConfigBag; +import org.apache.brooklyn.util.core.config.ResolvingConfigBag; +import org.apache.brooklyn.util.core.task.Tasks; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.guava.Maybe; +import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.DurationPredicates; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.Beta; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.reflect.TypeToken; + +@Beta +public abstract class AbstractScheduledEffectorPolicy extends AbstractPolicy implements Runnable, SensorEventListener { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractScheduledEffectorPolicy.class); + + public static final String TIME_FORMAT = "HH:mm:ss"; + public static final String NOW = "now"; + public static final String IMMEDIATELY = "immediately"; + + private static final DateFormat FORMATTER = SimpleDateFormat.getTimeInstance(); + + public static final ConfigKey EFFECTOR = ConfigKeys.builder(String.class) + .name("effector") + .description("The effector to be executed by this policy") + .constraint(Predicates.notNull()) + .build(); + + public static final ConfigKey> EFFECTOR_ARGUMENTS = ConfigKeys.builder(new TypeToken>() { }) + .name("args") + .description("The effector arguments and their values") + .constraint(Predicates.notNull()) + .defaultValue(ImmutableMap.of()) + .build(); + + public static final ConfigKey TIME = ConfigKeys.builder(String.class) + .name("time") + .description("An optional time when this policy should be first executed, formatted as HH:mm:ss") + .build(); + + public static final ConfigKey WAIT = ConfigKeys.builder(Duration.class) + .name("wait") + .description("An optional duration after which this policy should be first executed. The time config takes precedence if present") + .constraint(Predicates.or(Predicates.isNull(), DurationPredicates.positive())) + .build(); + + public static final ConfigKey> START_SENSOR = ConfigKeys.builder(new TypeToken>() { }) + .name("start.sensor") + .description("The sensor which should trigger starting the periodic execution scheduler") + .defaultValue(Startable.SERVICE_UP) + .build(); + + public static final ConfigKey RUNNING = ConfigKeys.builder(Boolean.class) + .name("running") + .description("Set if the executor has started") + .defaultValue(Boolean.FALSE) + .reconfigurable(true) + .build(); + + protected final AtomicBoolean running = new AtomicBoolean(false); + protected final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + protected final Object mutex = new Object[0]; + + protected Effector effector; + + public AbstractScheduledEffectorPolicy() { + this(MutableMap.of()); + } + + public AbstractScheduledEffectorPolicy(Map props) { + super(props); + } + + public void setEntity(EntityLocal entity) { + super.setEntity(entity); + + effector = getEffector(); + + AttributeSensor sensor = config().get(START_SENSOR); + subscriptions().subscribe(entity, sensor, this); + } + + @Override + public void rebind() { + if (config().get(RUNNING)) { + running.set(true); + start(); + } + } + + @Override + protected void doReconfigureConfig(ConfigKey key, T val) { + if (key.isReconfigurable()) { + return; + } else { + throw new UnsupportedOperationException("Reconfiguring key " + key.getName() + " not supported on " + getClass().getSimpleName()); + } + } + + @Override + public void destroy(){ + super.destroy(); + executor.shutdownNow(); + } + + public abstract void start(); + + protected Effector getEffector() { + String effectorName = config().get(EFFECTOR); + Maybe> effector = entity.getEntityType().getEffectorByName(effectorName); + if (effector.isAbsentOrNull()) { + throw new IllegalStateException("Cannot find effector " + effectorName); + } + return effector.get(); + } + + protected Duration getWaitUntil(String time) { + if (time.equalsIgnoreCase(NOW) || time.equalsIgnoreCase(IMMEDIATELY)) { + return Duration.ZERO; + } + try { + Calendar now = Calendar.getInstance(); + Calendar when = Calendar.getInstance(); + boolean formatted = time.contains(":"); // FIXME deprecated TimeDuration coercion + Date parsed = formatted ? FORMATTER.parse(time) : new Date(Long.parseLong(time) * 1000); + when.setTime(parsed); + when.set(now.get(Calendar.YEAR), now.get(Calendar.MONTH), now.get(Calendar.DATE)); + if (when.before(now)) { + when.add(Calendar.DATE, 1); + } + return Duration.millis(Math.max(0, when.getTimeInMillis() - now.getTimeInMillis())); + } catch (ParseException | NumberFormatException e) { + LOG.warn("{}: Time should be formatted as {}: {}", new Object[] { this, TIME_FORMAT, e.getMessage() }); + throw Exceptions.propagate(e); + } + } + + @Override + public void run() { + synchronized (mutex) { + try { + ConfigBag bag = ResolvingConfigBag.newInstanceExtending(getManagementContext(), config().getBag()); + Map args = EntityInitializers.resolve(bag, EFFECTOR_ARGUMENTS); + LOG.debug("{}: Resolving arguments for {}: {}", new Object[] { this, effector.getName(), Iterables.toString(args.keySet()) }); + Map resolved = (Map) Tasks.resolving(args, Object.class) + .deep(true) + .context(entity) + .get(); + + LOG.debug("{}: Invoking effector on {}, {}({})", new Object[] { this, entity, effector.getName(), resolved }); + Object result = entity.invoke(effector, resolved).getUnchecked(); + LOG.debug("{}: Effector {} returned {}", new Object[] { this, effector.getName(), result }); + } catch (Throwable t) { + LOG.warn("{}: Exception running {}: {}", new Object[] { this, effector.getName(), t.getMessage() }); + Exceptions.propagate(t); + } + } + } + + @Override + public void onEvent(SensorEvent event) { + synchronized (mutex) { + LOG.debug("{}: Got event {}", this, event); + AttributeSensor sensor = config().get(START_SENSOR); + if (event.getSensor().getName().equals(sensor.getName())) { + Boolean start = (Boolean) event.getValue(); + if (start && running.compareAndSet(false, true)) { + config().set(RUNNING, true); + start(); + } + } + } + } +} diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java new file mode 100644 index 0000000000..58c10f2a69 --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.action; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.brooklyn.api.effector.Effector; +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.policy.Policy; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.DurationPredicates; +import org.apache.brooklyn.util.time.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.Beta; +import com.google.common.base.Preconditions; + +/** + * A {@link Policy} that executes an {@link Effector} at specific intervals. + *

+ * The following example shows a pair of policies that resize a cluster + * from one to ten entities during the day and back to one at night,: + *

{@code
+ * brooklyn.policies:
+ *   - type: org.apache.brooklyn.policy.action.PeriodicEffectorPolicy
+ *     brooklyn.config:
+ *       effector: resize
+ *       args:
+ *         desiredSize: 10
+ *       period: 1 day
+ *       time: 08:00:00
+ *   - type: org.apache.brooklyn.policy.action.PeriodicEffectorPolicy
+ *     brooklyn.config:
+ *       effector: resize
+ *       args:
+ *         desiredSize: 1
+ *       period: 1 day
+ *       time: 18:00:00
+ * }
+ */ +@Beta +public class PeriodicEffectorPolicy extends AbstractScheduledEffectorPolicy { + + private static final Logger LOG = LoggerFactory.getLogger(PeriodicEffectorPolicy.class); + + public static final ConfigKey PERIOD = ConfigKeys.builder(Duration.class) + .name("period") + .description("The duration between executions of this policy") + .constraint(DurationPredicates.positive()) + .defaultValue(Duration.hours(1)) + .build(); + + public PeriodicEffectorPolicy() { + this(MutableMap.of()); + } + + public PeriodicEffectorPolicy(Map props) { + super(props); + } + + @Override + public void setEntity(final EntityLocal entity) { + super.setEntity(entity); + } + + @Override + public void start() { + Duration period = Preconditions.checkNotNull(config().get(PERIOD), "The period must be configured for this policy"); + String time = config().get(TIME); + Duration wait = config().get(WAIT); + if (time != null) { + wait = getWaitUntil(time); + } else if (wait == null) { + wait = period; + } + + LOG.debug("{}: Scheduling {} every {} in {}", new Object[] { PeriodicEffectorPolicy.this, effector.getName(), + Time.fromDurationToTimeStringRounded().apply(period), Time.fromDurationToTimeStringRounded().apply(wait) }); + executor.scheduleAtFixedRate(PeriodicEffectorPolicy.this, wait.toMilliseconds(), period.toMilliseconds(), TimeUnit.MILLISECONDS); + } +} diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java new file mode 100644 index 0000000000..44e6aacd8e --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.action; + +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.brooklyn.api.effector.Effector; +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.policy.Policy; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.sensor.Sensors; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.Beta; +import com.google.common.collect.Lists; +import com.google.common.reflect.TypeToken; + +/** + * A {@link Policy} the executes an {@link Effector} at a specific time in the future. + *

+ *

{@code
+ * brooklyn.policies:
+ *   - type: org.apache.brooklyn.policy.action.ScheduledEffectorPolicy
+ *     brooklyn.config:
+ *       effector: update
+ *       time: 12:00:00
+ * }
+ */ +@Beta +public class ScheduledEffectorPolicy extends AbstractScheduledEffectorPolicy { + + private static final Logger LOG = LoggerFactory.getLogger(ScheduledEffectorPolicy.class); + + public static final ConfigKey> SCHEDULED = ConfigKeys.builder(new TypeToken>() { }) + .name("scheduled") + .description("List of all scheduled execution start times") + .defaultValue(Lists.newCopyOnWriteArrayList()) + .reconfigurable(true) + .build(); + + public static final AttributeSensor INVOKE_IMMEDIATELY = Sensors.newBooleanSensor("scheduler.invoke.now", "Invoke the configured effector immediately when this becomes true"); + public static final AttributeSensor INVOKE_AT = Sensors.newSensor(Date.class, "scheduler.invoke.at", "Invoke the configured effector at this time"); + + public ScheduledEffectorPolicy() { + this(MutableMap.of()); + } + + public ScheduledEffectorPolicy(Map props) { + super(props); + } + + @Override + public void setEntity(final EntityLocal entity) { + super.setEntity(entity); + + subscriptions().subscribe(entity, INVOKE_IMMEDIATELY, this); + subscriptions().subscribe(entity, INVOKE_AT, this); + } + + @Override + public void rebind() { + super.rebind(); + List scheduled = config().get(SCHEDULED); + for (Long when : scheduled) { + Duration wait = Duration.millis(when - System.currentTimeMillis()); + if (wait.isPositive()) { + schedule(wait); + } else { + scheduled.remove(when); + } + } + } + + @Override + public void start() { + String time = config().get(TIME); + Duration wait = config().get(WAIT); + + if (time != null) { + LOG.debug("{}: Scheduling {} at {} (in {})", + new Object[] { this, effector.getName(), time, Time.fromDurationToTimeStringRounded().apply(wait) }); + wait = getWaitUntil(time); + } + + if (wait != null) { + schedule(wait); + } + } + + protected void schedule(Duration wait) { + List scheduled = config().get(SCHEDULED); + scheduled.add(System.currentTimeMillis() + wait.toMilliseconds()); + + LOG.debug("{}: Scheduling {} in {} ({} ms)", + new Object[] { this, effector.getName(), Time.fromDurationToTimeStringRounded().apply(wait), wait.toMilliseconds() }); + executor.schedule(this, wait.toMilliseconds(), TimeUnit.MILLISECONDS); + } + + @Override + public void onEvent(SensorEvent event) { + synchronized (mutex) { + super.onEvent(event); + + if (running.get()) { + if (event.getSensor().getName().equals(INVOKE_AT.getName())) { + String time = (String) event.getValue(); + if (time != null) { + schedule(getWaitUntil(time)); + } + } + if (event.getSensor().getName().equals(INVOKE_IMMEDIATELY.getName())) { + Boolean invoke = (Boolean) event.getValue(); + if (invoke) { + schedule(Duration.ZERO); + } + } + } + } + } + +} diff --git a/policy/src/main/resources/catalog.bom b/policy/src/main/resources/catalog.bom index d92b83634d..9c9e323fad 100644 --- a/policy/src/main/resources/catalog.bom +++ b/policy/src/main/resources/catalog.bom @@ -49,6 +49,21 @@ brooklyn.catalog: Policy that is attached to a Resizable entity and dynamically adjusts its size in response to either keep a metric within a given range, or in response to POOL_COLD and POOL_HOT events + - id: org.apache.brooklyn.policy.action.PeriodicEffectorPolicy + itemType: policy + item: + type: org.apache.brooklyn.policy.action.PeriodicEffectorPolicy + name: Periodic Effector Execution + description: | + Policy that executes an effector repeatedly at configurable intervals. + - id: org.apache.brooklyn.policy.action.ScheduledEffectorPolicy + itemType: policy + item: + type: org.apache.brooklyn.policy.action.ScheduledEffectorPolicy + name: Scheduled Effector Execution + description: | + Policy that executes an effector at a configurable time or after + a configurable delay. # Removed from catalog because 'FollowTheSunPool' cannot currently be configured via catalog mechanisms. # Also removing associated 'BalanceableWorkerPool' etc as they are only useful with 'FollowTheSunPool' diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java new file mode 100644 index 0000000000..e6ae74d5d1 --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.brooklyn.policy.action; + +import org.apache.brooklyn.api.entity.EntitySpec; +import org.apache.brooklyn.api.policy.Policy; +import org.apache.brooklyn.api.policy.PolicySpec; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.core.sensor.Sensors; +import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; +import org.apache.brooklyn.core.test.entity.TestEntity; +import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.time.Duration; +import org.testng.annotations.Test; + +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; + +public class PeriodicEffectorPolicyTest extends BrooklynAppUnitTestSupport { + + @Test + public void testPeriodicEffectorFires() { + final AttributeSensor start = Sensors.newBooleanSensor("start"); + + final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) + .policy(PolicySpec.create(PeriodicEffectorPolicy.class) + .configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector") + .configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of()) + .configure(PeriodicEffectorPolicy.PERIOD, Duration.ONE_MILLISECOND) + .configure(PeriodicEffectorPolicy.TIME, "immediately") + .configure(PeriodicEffectorPolicy.START_SENSOR, start))); + Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull(); + Asserts.assertNotNull(policy); + + Asserts.assertTrue(entity.getCallHistory().isEmpty()); + Asserts.assertFalse(policy.config().get(PeriodicEffectorPolicy.RUNNING)); + + entity.sensors().set(start, Boolean.TRUE); + Asserts.eventually(() -> policy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b); + Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector")); + int calls = entity.getCallHistory().size(); + Asserts.eventually(() -> entity.getCallHistory().size(), i -> i > (calls + 500)); + } + + @Test + public void testPeriodicEffectorFiresAfterDelay() { + final AttributeSensor start = Sensors.newBooleanSensor("start"); + + final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) + .policy(PolicySpec.create(PeriodicEffectorPolicy.class) + .configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector") + .configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of()) + .configure(PeriodicEffectorPolicy.PERIOD, Duration.ONE_MILLISECOND) + .configure(PeriodicEffectorPolicy.WAIT, Duration.TEN_SECONDS) + .configure(PeriodicEffectorPolicy.START_SENSOR, start))); + Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull(); + Asserts.assertNotNull(policy); + + Asserts.assertTrue(entity.getCallHistory().isEmpty()); + Asserts.assertFalse(policy.config().get(PeriodicEffectorPolicy.RUNNING)); + + entity.sensors().set(start, Boolean.TRUE); + Asserts.eventually(() -> policy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b); + sleep(Duration.seconds(5)); + Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector")); + sleep(Duration.seconds(5)); + Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector")); + int calls = entity.getCallHistory().size(); + Asserts.eventually(() -> entity.getCallHistory().size(), i -> i > (calls + 500)); + } + + private void sleep(Duration duration) { + try { + Thread.sleep(duration.toMilliseconds()); + } catch (InterruptedException ie) { + Exceptions.propagate(ie); + } + } +} diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java new file mode 100644 index 0000000000..daa186e6f4 --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.brooklyn.policy.action; + +import org.apache.brooklyn.api.entity.EntitySpec; +import org.apache.brooklyn.api.policy.Policy; +import org.apache.brooklyn.api.policy.PolicySpec; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.core.sensor.Sensors; +import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; +import org.apache.brooklyn.core.test.entity.TestEntity; +import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.time.Duration; +import org.testng.annotations.Test; + +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; + +public class ScheduledEffectorPolicyTest extends BrooklynAppUnitTestSupport { + + @Test + public void testScheduledEffectorFiresImmediately() { + final AttributeSensor start = Sensors.newBooleanSensor("start"); + + final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) + .policy(PolicySpec.create(ScheduledEffectorPolicy.class) + .configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector") + .configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of()) + .configure(ScheduledEffectorPolicy.TIME, "immediately") + .configure(PeriodicEffectorPolicy.START_SENSOR, start))); + Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull(); + Asserts.assertNotNull(policy); + + Asserts.assertTrue(entity.getCallHistory().isEmpty()); + Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING)); + + entity.sensors().set(start, Boolean.TRUE); + Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b); + Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector")); + } + + @Test + public void testScheduledEffectorFiresAfterDelay() { + final AttributeSensor start = Sensors.newBooleanSensor("start"); + + final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) + .policy(PolicySpec.create(ScheduledEffectorPolicy.class) + .configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector") + .configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of()) + .configure(ScheduledEffectorPolicy.WAIT, Duration.TEN_SECONDS) + .configure(ScheduledEffectorPolicy.START_SENSOR, start))); + Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull(); + Asserts.assertNotNull(policy); + + Asserts.assertTrue(entity.getCallHistory().isEmpty()); + Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING)); + + entity.sensors().set(start, Boolean.TRUE); + Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b); + sleep(Duration.seconds(5)); + Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector")); + sleep(Duration.seconds(5)); + Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector")); + } + + @Test + public void testScheduledEffectorFiresOnSensor() { + final AttributeSensor start = Sensors.newBooleanSensor("start"); + + final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) + .policy(PolicySpec.create(ScheduledEffectorPolicy.class) + .configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector") + .configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of()) + .configure(ScheduledEffectorPolicy.START_SENSOR, start))); + Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull(); + Asserts.assertNotNull(policy); + + Asserts.assertTrue(entity.getCallHistory().isEmpty()); + Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING)); + + entity.sensors().set(start, Boolean.TRUE); + Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b); + sleep(Duration.seconds(5)); + Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector")); + + entity.sensors().set(ScheduledEffectorPolicy.INVOKE_IMMEDIATELY, Boolean.TRUE); + Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector")); + } + + private void sleep(Duration duration) { + try { + Thread.sleep(duration.toMilliseconds()); + } catch (InterruptedException ie) { + Exceptions.propagate(ie); + } + } +} From 0a71cf58b018cde15b2f71f8b97dc063b871ebd6 Mon Sep 17 00:00:00 2001 From: Andrew Donald Kennedy Date: Fri, 15 Sep 2017 17:14:11 +0100 Subject: [PATCH 2/2] Fix issues with rebind --- .../AbstractScheduledEffectorPolicy.java | 114 ++++++++++++------ .../policy/action/PeriodicEffectorPolicy.java | 49 +++++--- .../action/ScheduledEffectorPolicy.java | 72 ++--------- .../action/PeriodicEffectorPolicyTest.java | 18 ++- .../action/ScheduledEffectorPolicyTest.java | 26 ++-- 5 files changed, 139 insertions(+), 140 deletions(-) diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java index 73438c6c3d..5978ec5b44 100644 --- a/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java +++ b/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java @@ -23,9 +23,11 @@ import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; +import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.brooklyn.api.effector.Effector; @@ -38,11 +40,11 @@ import org.apache.brooklyn.core.entity.EntityInitializers; import org.apache.brooklyn.core.entity.trait.Startable; import org.apache.brooklyn.core.policy.AbstractPolicy; -import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.core.config.ResolvingConfigBag; import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException; import org.apache.brooklyn.util.guava.Maybe; import org.apache.brooklyn.util.time.Duration; import org.apache.brooklyn.util.time.DurationPredicates; @@ -53,6 +55,7 @@ import com.google.common.base.Predicates; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.common.reflect.TypeToken; @Beta @@ -103,20 +106,35 @@ public abstract class AbstractScheduledEffectorPolicy extends AbstractPolicy imp .reconfigurable(true) .build(); - protected final AtomicBoolean running = new AtomicBoolean(false); - protected final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - protected final Object mutex = new Object[0]; + public static final ConfigKey> SCHEDULED = ConfigKeys.builder(new TypeToken>() { }) + .name("scheduled") + .description("List of all scheduled execution start times") + .defaultValue(Lists.newCopyOnWriteArrayList()) + .reconfigurable(true) + .build(); - protected Effector effector; + protected AtomicBoolean running; + protected ScheduledExecutorService executor; + protected Effector effector; public AbstractScheduledEffectorPolicy() { - this(MutableMap.of()); + LOG.debug("Created new scheduled effector policy"); + } + + @Override + public void init() { + setup(); } - public AbstractScheduledEffectorPolicy(Map props) { - super(props); + public void setup() { + if (executor != null) { + executor.shutdownNow(); + } + executor = Executors.newSingleThreadScheduledExecutor(); + running = new AtomicBoolean(false); } + @Override public void setEntity(EntityLocal entity) { super.setEntity(entity); @@ -128,9 +146,22 @@ public void setEntity(EntityLocal entity) { @Override public void rebind() { + setup(); + if (config().get(RUNNING)) { running.set(true); - start(); + } + + if (running.get()) { + List scheduled = config().get(SCHEDULED); + for (Long when : scheduled) { + Duration wait = Duration.millis(when - System.currentTimeMillis()); + if (wait.isPositive()) { + schedule(wait); + } else { + scheduled.remove(when); + } + } } } @@ -145,15 +176,15 @@ protected void doReconfigureConfig(ConfigKey key, T val) { @Override public void destroy(){ - super.destroy(); executor.shutdownNow(); + super.destroy(); } public abstract void start(); protected Effector getEffector() { String effectorName = config().get(EFFECTOR); - Maybe> effector = entity.getEntityType().getEffectorByName(effectorName); + Maybe> effector = getEntity().getEntityType().getEffectorByName(effectorName); if (effector.isAbsentOrNull()) { throw new IllegalStateException("Cannot find effector " + effectorName); } @@ -181,39 +212,46 @@ protected Duration getWaitUntil(String time) { } } + protected void schedule(Duration wait) { + List scheduled = config().get(SCHEDULED); + scheduled.add(System.currentTimeMillis() + wait.toMilliseconds()); + + executor.schedule(this, wait.toMilliseconds(), TimeUnit.MILLISECONDS); + } + @Override - public void run() { - synchronized (mutex) { - try { - ConfigBag bag = ResolvingConfigBag.newInstanceExtending(getManagementContext(), config().getBag()); - Map args = EntityInitializers.resolve(bag, EFFECTOR_ARGUMENTS); - LOG.debug("{}: Resolving arguments for {}: {}", new Object[] { this, effector.getName(), Iterables.toString(args.keySet()) }); - Map resolved = (Map) Tasks.resolving(args, Object.class) - .deep(true) - .context(entity) - .get(); - - LOG.debug("{}: Invoking effector on {}, {}({})", new Object[] { this, entity, effector.getName(), resolved }); - Object result = entity.invoke(effector, resolved).getUnchecked(); - LOG.debug("{}: Effector {} returned {}", new Object[] { this, effector.getName(), result }); - } catch (Throwable t) { - LOG.warn("{}: Exception running {}: {}", new Object[] { this, effector.getName(), t.getMessage() }); - Exceptions.propagate(t); - } + public synchronized void run() { + if (effector == null) return; + try { + ConfigBag bag = ResolvingConfigBag.newInstanceExtending(getManagementContext(), config().getBag()); + Map args = EntityInitializers.resolve(bag, EFFECTOR_ARGUMENTS); + LOG.debug("{}: Resolving arguments for {}: {}", new Object[] { this, effector.getName(), Iterables.toString(args.keySet()) }); + Map resolved = (Map) Tasks.resolving(args, Object.class) + .deep(true) + .context(entity) + .get(); + + LOG.debug("{}: Invoking effector on {}, {}({})", new Object[] { this, entity, effector.getName(), resolved }); + Object result = entity.invoke(effector, resolved).getUnchecked(); + LOG.debug("{}: Effector {} returned {}", new Object[] { this, effector.getName(), result }); + } catch (RuntimeInterruptedException rie) { + Thread.interrupted(); + // TODO sometimes this seems to hang the executor? + } catch (Throwable t) { + LOG.warn("{}: Exception running {}: {}", new Object[] { this, effector.getName(), t.getMessage() }); + Exceptions.propagate(t); } } @Override public void onEvent(SensorEvent event) { - synchronized (mutex) { - LOG.debug("{}: Got event {}", this, event); - AttributeSensor sensor = config().get(START_SENSOR); - if (event.getSensor().getName().equals(sensor.getName())) { - Boolean start = (Boolean) event.getValue(); - if (start && running.compareAndSet(false, true)) { - config().set(RUNNING, true); - start(); - } + LOG.debug("{}: Got event {}", this, event); + AttributeSensor sensor = config().get(START_SENSOR); + if (event.getSensor().getName().equals(sensor.getName())) { + Boolean start = (Boolean) event.getValue(); + if (start && running.compareAndSet(false, true)) { + config().set(RUNNING, true); + start(); } } } diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java index 58c10f2a69..f2c0936cd4 100644 --- a/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java +++ b/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java @@ -18,18 +18,17 @@ */ package org.apache.brooklyn.policy.action; -import java.util.Map; -import java.util.concurrent.TimeUnit; +import java.util.List; import org.apache.brooklyn.api.effector.Effector; import org.apache.brooklyn.api.entity.EntityLocal; import org.apache.brooklyn.api.policy.Policy; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.text.Strings; import org.apache.brooklyn.util.time.Duration; import org.apache.brooklyn.util.time.DurationPredicates; -import org.apache.brooklyn.util.time.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,16 +71,7 @@ public class PeriodicEffectorPolicy extends AbstractScheduledEffectorPolicy { .build(); public PeriodicEffectorPolicy() { - this(MutableMap.of()); - } - - public PeriodicEffectorPolicy(Map props) { - super(props); - } - - @Override - public void setEntity(final EntityLocal entity) { - super.setEntity(entity); + super(); } @Override @@ -95,8 +85,33 @@ public void start() { wait = period; } - LOG.debug("{}: Scheduling {} every {} in {}", new Object[] { PeriodicEffectorPolicy.this, effector.getName(), - Time.fromDurationToTimeStringRounded().apply(period), Time.fromDurationToTimeStringRounded().apply(wait) }); - executor.scheduleAtFixedRate(PeriodicEffectorPolicy.this, wait.toMilliseconds(), period.toMilliseconds(), TimeUnit.MILLISECONDS); + schedule(wait); + } + + @Override + public void rebind() { + super.rebind(); + + // Check if we missed an entire period + List scheduled = config().get(SCHEDULED); + if (running.get() && scheduled.isEmpty()) { + start(); + } + } + + @Override + public synchronized void run() { + try { + super.run(); + } finally { + Duration period = config().get(PERIOD); + String time = config().get(TIME); + if (time == null || time.equalsIgnoreCase(NOW) || time.equalsIgnoreCase(IMMEDIATELY)) { + schedule(period); + } else { + Duration wait = getWaitUntil(time); + schedule(wait.upperBound(period)); + } + } } } diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java index 44e6aacd8e..5a3bed7db8 100644 --- a/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java +++ b/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java @@ -19,27 +19,18 @@ package org.apache.brooklyn.policy.action; import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; import org.apache.brooklyn.api.effector.Effector; import org.apache.brooklyn.api.entity.EntityLocal; import org.apache.brooklyn.api.policy.Policy; import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.api.sensor.SensorEvent; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.sensor.Sensors; -import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.time.Duration; -import org.apache.brooklyn.util.time.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.Beta; -import com.google.common.collect.Lists; -import com.google.common.reflect.TypeToken; /** * A {@link Policy} the executes an {@link Effector} at a specific time in the future. @@ -57,22 +48,11 @@ public class ScheduledEffectorPolicy extends AbstractScheduledEffectorPolicy { private static final Logger LOG = LoggerFactory.getLogger(ScheduledEffectorPolicy.class); - public static final ConfigKey> SCHEDULED = ConfigKeys.builder(new TypeToken>() { }) - .name("scheduled") - .description("List of all scheduled execution start times") - .defaultValue(Lists.newCopyOnWriteArrayList()) - .reconfigurable(true) - .build(); - public static final AttributeSensor INVOKE_IMMEDIATELY = Sensors.newBooleanSensor("scheduler.invoke.now", "Invoke the configured effector immediately when this becomes true"); public static final AttributeSensor INVOKE_AT = Sensors.newSensor(Date.class, "scheduler.invoke.at", "Invoke the configured effector at this time"); public ScheduledEffectorPolicy() { - this(MutableMap.of()); - } - - public ScheduledEffectorPolicy(Map props) { - super(props); + super(); } @Override @@ -83,28 +63,12 @@ public void setEntity(final EntityLocal entity) { subscriptions().subscribe(entity, INVOKE_AT, this); } - @Override - public void rebind() { - super.rebind(); - List scheduled = config().get(SCHEDULED); - for (Long when : scheduled) { - Duration wait = Duration.millis(when - System.currentTimeMillis()); - if (wait.isPositive()) { - schedule(wait); - } else { - scheduled.remove(when); - } - } - } - @Override public void start() { String time = config().get(TIME); Duration wait = config().get(WAIT); if (time != null) { - LOG.debug("{}: Scheduling {} at {} (in {})", - new Object[] { this, effector.getName(), time, Time.fromDurationToTimeStringRounded().apply(wait) }); wait = getWaitUntil(time); } @@ -113,35 +77,23 @@ public void start() { } } - protected void schedule(Duration wait) { - List scheduled = config().get(SCHEDULED); - scheduled.add(System.currentTimeMillis() + wait.toMilliseconds()); - - LOG.debug("{}: Scheduling {} in {} ({} ms)", - new Object[] { this, effector.getName(), Time.fromDurationToTimeStringRounded().apply(wait), wait.toMilliseconds() }); - executor.schedule(this, wait.toMilliseconds(), TimeUnit.MILLISECONDS); - } - @Override public void onEvent(SensorEvent event) { - synchronized (mutex) { - super.onEvent(event); + super.onEvent(event); - if (running.get()) { - if (event.getSensor().getName().equals(INVOKE_AT.getName())) { - String time = (String) event.getValue(); - if (time != null) { - schedule(getWaitUntil(time)); - } + if (running.get()) { + if (event.getSensor().getName().equals(INVOKE_AT.getName())) { + String time = (String) event.getValue(); + if (time != null) { + schedule(getWaitUntil(time)); } - if (event.getSensor().getName().equals(INVOKE_IMMEDIATELY.getName())) { - Boolean invoke = (Boolean) event.getValue(); - if (invoke) { - schedule(Duration.ZERO); - } + } + if (event.getSensor().getName().equals(INVOKE_IMMEDIATELY.getName())) { + Boolean invoke = (Boolean) event.getValue(); + if (invoke) { + schedule(Duration.ZERO); } } } } - } diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java index e6ae74d5d1..0268d60a23 100644 --- a/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java +++ b/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java @@ -37,24 +37,24 @@ public class PeriodicEffectorPolicyTest extends BrooklynAppUnitTestSupport { + private static final AttributeSensor START = Sensors.newBooleanSensor("start"); + @Test public void testPeriodicEffectorFires() { - final AttributeSensor start = Sensors.newBooleanSensor("start"); - - final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) + TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) .policy(PolicySpec.create(PeriodicEffectorPolicy.class) .configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector") .configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of()) .configure(PeriodicEffectorPolicy.PERIOD, Duration.ONE_MILLISECOND) .configure(PeriodicEffectorPolicy.TIME, "immediately") - .configure(PeriodicEffectorPolicy.START_SENSOR, start))); + .configure(PeriodicEffectorPolicy.START_SENSOR, START))); Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull(); Asserts.assertNotNull(policy); Asserts.assertTrue(entity.getCallHistory().isEmpty()); Asserts.assertFalse(policy.config().get(PeriodicEffectorPolicy.RUNNING)); - entity.sensors().set(start, Boolean.TRUE); + entity.sensors().set(START, Boolean.TRUE); Asserts.eventually(() -> policy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b); Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector")); int calls = entity.getCallHistory().size(); @@ -63,22 +63,20 @@ public void testPeriodicEffectorFires() { @Test public void testPeriodicEffectorFiresAfterDelay() { - final AttributeSensor start = Sensors.newBooleanSensor("start"); - - final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) + TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) .policy(PolicySpec.create(PeriodicEffectorPolicy.class) .configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector") .configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of()) .configure(PeriodicEffectorPolicy.PERIOD, Duration.ONE_MILLISECOND) .configure(PeriodicEffectorPolicy.WAIT, Duration.TEN_SECONDS) - .configure(PeriodicEffectorPolicy.START_SENSOR, start))); + .configure(PeriodicEffectorPolicy.START_SENSOR, START))); Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull(); Asserts.assertNotNull(policy); Asserts.assertTrue(entity.getCallHistory().isEmpty()); Asserts.assertFalse(policy.config().get(PeriodicEffectorPolicy.RUNNING)); - entity.sensors().set(start, Boolean.TRUE); + entity.sensors().set(START, Boolean.TRUE); Asserts.eventually(() -> policy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b); sleep(Duration.seconds(5)); Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector")); diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java index daa186e6f4..5271de3188 100644 --- a/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java +++ b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java @@ -37,44 +37,42 @@ public class ScheduledEffectorPolicyTest extends BrooklynAppUnitTestSupport { + private static final AttributeSensor START = Sensors.newBooleanSensor("start"); + @Test public void testScheduledEffectorFiresImmediately() { - final AttributeSensor start = Sensors.newBooleanSensor("start"); - - final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) + TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) .policy(PolicySpec.create(ScheduledEffectorPolicy.class) .configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector") .configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of()) .configure(ScheduledEffectorPolicy.TIME, "immediately") - .configure(PeriodicEffectorPolicy.START_SENSOR, start))); + .configure(PeriodicEffectorPolicy.START_SENSOR, START))); Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull(); Asserts.assertNotNull(policy); Asserts.assertTrue(entity.getCallHistory().isEmpty()); Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING)); - entity.sensors().set(start, Boolean.TRUE); + entity.sensors().set(START, Boolean.TRUE); Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b); Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector")); } @Test public void testScheduledEffectorFiresAfterDelay() { - final AttributeSensor start = Sensors.newBooleanSensor("start"); - - final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) + TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) .policy(PolicySpec.create(ScheduledEffectorPolicy.class) .configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector") .configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of()) .configure(ScheduledEffectorPolicy.WAIT, Duration.TEN_SECONDS) - .configure(ScheduledEffectorPolicy.START_SENSOR, start))); + .configure(ScheduledEffectorPolicy.START_SENSOR, START))); Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull(); Asserts.assertNotNull(policy); Asserts.assertTrue(entity.getCallHistory().isEmpty()); Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING)); - entity.sensors().set(start, Boolean.TRUE); + entity.sensors().set(START, Boolean.TRUE); Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b); sleep(Duration.seconds(5)); Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector")); @@ -84,20 +82,18 @@ public void testScheduledEffectorFiresAfterDelay() { @Test public void testScheduledEffectorFiresOnSensor() { - final AttributeSensor start = Sensors.newBooleanSensor("start"); - - final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) + TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) .policy(PolicySpec.create(ScheduledEffectorPolicy.class) .configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector") .configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of()) - .configure(ScheduledEffectorPolicy.START_SENSOR, start))); + .configure(ScheduledEffectorPolicy.START_SENSOR, START))); Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull(); Asserts.assertNotNull(policy); Asserts.assertTrue(entity.getCallHistory().isEmpty()); Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING)); - entity.sensors().set(start, Boolean.TRUE); + entity.sensors().set(START, Boolean.TRUE); Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b); sleep(Duration.seconds(5)); Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector"));