diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java new file mode 100644 index 0000000000..5978ec5b44 --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.action; + +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.brooklyn.api.effector.Effector; +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.api.sensor.SensorEventListener; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.entity.EntityInitializers; +import org.apache.brooklyn.core.entity.trait.Startable; +import org.apache.brooklyn.core.policy.AbstractPolicy; +import org.apache.brooklyn.util.core.config.ConfigBag; +import org.apache.brooklyn.util.core.config.ResolvingConfigBag; +import org.apache.brooklyn.util.core.task.Tasks; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException; +import org.apache.brooklyn.util.guava.Maybe; +import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.DurationPredicates; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.Beta; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.reflect.TypeToken; + +@Beta +public abstract class AbstractScheduledEffectorPolicy extends AbstractPolicy implements Runnable, SensorEventListener { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractScheduledEffectorPolicy.class); + + public static final String TIME_FORMAT = "HH:mm:ss"; + public static final String NOW = "now"; + public static final String IMMEDIATELY = "immediately"; + + private static final DateFormat FORMATTER = SimpleDateFormat.getTimeInstance(); + + public static final ConfigKey EFFECTOR = ConfigKeys.builder(String.class) + .name("effector") + .description("The effector to be executed by this policy") + .constraint(Predicates.notNull()) + .build(); + + public static final ConfigKey> EFFECTOR_ARGUMENTS = ConfigKeys.builder(new TypeToken>() { }) + .name("args") + .description("The effector arguments and their values") + .constraint(Predicates.notNull()) + .defaultValue(ImmutableMap.of()) + .build(); + + public static final ConfigKey TIME = ConfigKeys.builder(String.class) + .name("time") + .description("An optional time when this policy should be first executed, formatted as HH:mm:ss") + .build(); + + public static final ConfigKey WAIT = ConfigKeys.builder(Duration.class) + .name("wait") + .description("An optional duration after which this policy should be first executed. The time config takes precedence if present") + .constraint(Predicates.or(Predicates.isNull(), DurationPredicates.positive())) + .build(); + + public static final ConfigKey> START_SENSOR = ConfigKeys.builder(new TypeToken>() { }) + .name("start.sensor") + .description("The sensor which should trigger starting the periodic execution scheduler") + .defaultValue(Startable.SERVICE_UP) + .build(); + + public static final ConfigKey RUNNING = ConfigKeys.builder(Boolean.class) + .name("running") + .description("Set if the executor has started") + .defaultValue(Boolean.FALSE) + .reconfigurable(true) + .build(); + + public static final ConfigKey> SCHEDULED = ConfigKeys.builder(new TypeToken>() { }) + .name("scheduled") + .description("List of all scheduled execution start times") + .defaultValue(Lists.newCopyOnWriteArrayList()) + .reconfigurable(true) + .build(); + + protected AtomicBoolean running; + protected ScheduledExecutorService executor; + protected Effector effector; + + public AbstractScheduledEffectorPolicy() { + LOG.debug("Created new scheduled effector policy"); + } + + @Override + public void init() { + setup(); + } + + public void setup() { + if (executor != null) { + executor.shutdownNow(); + } + executor = Executors.newSingleThreadScheduledExecutor(); + running = new AtomicBoolean(false); + } + + @Override + public void setEntity(EntityLocal entity) { + super.setEntity(entity); + + effector = getEffector(); + + AttributeSensor sensor = config().get(START_SENSOR); + subscriptions().subscribe(entity, sensor, this); + } + + @Override + public void rebind() { + setup(); + + if (config().get(RUNNING)) { + running.set(true); + } + + if (running.get()) { + List scheduled = config().get(SCHEDULED); + for (Long when : scheduled) { + Duration wait = Duration.millis(when - System.currentTimeMillis()); + if (wait.isPositive()) { + schedule(wait); + } else { + scheduled.remove(when); + } + } + } + } + + @Override + protected void doReconfigureConfig(ConfigKey key, T val) { + if (key.isReconfigurable()) { + return; + } else { + throw new UnsupportedOperationException("Reconfiguring key " + key.getName() + " not supported on " + getClass().getSimpleName()); + } + } + + @Override + public void destroy(){ + executor.shutdownNow(); + super.destroy(); + } + + public abstract void start(); + + protected Effector getEffector() { + String effectorName = config().get(EFFECTOR); + Maybe> effector = getEntity().getEntityType().getEffectorByName(effectorName); + if (effector.isAbsentOrNull()) { + throw new IllegalStateException("Cannot find effector " + effectorName); + } + return effector.get(); + } + + protected Duration getWaitUntil(String time) { + if (time.equalsIgnoreCase(NOW) || time.equalsIgnoreCase(IMMEDIATELY)) { + return Duration.ZERO; + } + try { + Calendar now = Calendar.getInstance(); + Calendar when = Calendar.getInstance(); + boolean formatted = time.contains(":"); // FIXME deprecated TimeDuration coercion + Date parsed = formatted ? FORMATTER.parse(time) : new Date(Long.parseLong(time) * 1000); + when.setTime(parsed); + when.set(now.get(Calendar.YEAR), now.get(Calendar.MONTH), now.get(Calendar.DATE)); + if (when.before(now)) { + when.add(Calendar.DATE, 1); + } + return Duration.millis(Math.max(0, when.getTimeInMillis() - now.getTimeInMillis())); + } catch (ParseException | NumberFormatException e) { + LOG.warn("{}: Time should be formatted as {}: {}", new Object[] { this, TIME_FORMAT, e.getMessage() }); + throw Exceptions.propagate(e); + } + } + + protected void schedule(Duration wait) { + List scheduled = config().get(SCHEDULED); + scheduled.add(System.currentTimeMillis() + wait.toMilliseconds()); + + executor.schedule(this, wait.toMilliseconds(), TimeUnit.MILLISECONDS); + } + + @Override + public synchronized void run() { + if (effector == null) return; + try { + ConfigBag bag = ResolvingConfigBag.newInstanceExtending(getManagementContext(), config().getBag()); + Map args = EntityInitializers.resolve(bag, EFFECTOR_ARGUMENTS); + LOG.debug("{}: Resolving arguments for {}: {}", new Object[] { this, effector.getName(), Iterables.toString(args.keySet()) }); + Map resolved = (Map) Tasks.resolving(args, Object.class) + .deep(true) + .context(entity) + .get(); + + LOG.debug("{}: Invoking effector on {}, {}({})", new Object[] { this, entity, effector.getName(), resolved }); + Object result = entity.invoke(effector, resolved).getUnchecked(); + LOG.debug("{}: Effector {} returned {}", new Object[] { this, effector.getName(), result }); + } catch (RuntimeInterruptedException rie) { + Thread.interrupted(); + // TODO sometimes this seems to hang the executor? + } catch (Throwable t) { + LOG.warn("{}: Exception running {}: {}", new Object[] { this, effector.getName(), t.getMessage() }); + Exceptions.propagate(t); + } + } + + @Override + public void onEvent(SensorEvent event) { + LOG.debug("{}: Got event {}", this, event); + AttributeSensor sensor = config().get(START_SENSOR); + if (event.getSensor().getName().equals(sensor.getName())) { + Boolean start = (Boolean) event.getValue(); + if (start && running.compareAndSet(false, true)) { + config().set(RUNNING, true); + start(); + } + } + } +} diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java new file mode 100644 index 0000000000..f2c0936cd4 --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.action; + +import java.util.List; + +import org.apache.brooklyn.api.effector.Effector; +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.policy.Policy; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.text.Strings; +import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.DurationPredicates; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.Beta; +import com.google.common.base.Preconditions; + +/** + * A {@link Policy} that executes an {@link Effector} at specific intervals. + *

+ * The following example shows a pair of policies that resize a cluster + * from one to ten entities during the day and back to one at night,: + *

{@code
+ * brooklyn.policies:
+ *   - type: org.apache.brooklyn.policy.action.PeriodicEffectorPolicy
+ *     brooklyn.config:
+ *       effector: resize
+ *       args:
+ *         desiredSize: 10
+ *       period: 1 day
+ *       time: 08:00:00
+ *   - type: org.apache.brooklyn.policy.action.PeriodicEffectorPolicy
+ *     brooklyn.config:
+ *       effector: resize
+ *       args:
+ *         desiredSize: 1
+ *       period: 1 day
+ *       time: 18:00:00
+ * }
+ */ +@Beta +public class PeriodicEffectorPolicy extends AbstractScheduledEffectorPolicy { + + private static final Logger LOG = LoggerFactory.getLogger(PeriodicEffectorPolicy.class); + + public static final ConfigKey PERIOD = ConfigKeys.builder(Duration.class) + .name("period") + .description("The duration between executions of this policy") + .constraint(DurationPredicates.positive()) + .defaultValue(Duration.hours(1)) + .build(); + + public PeriodicEffectorPolicy() { + super(); + } + + @Override + public void start() { + Duration period = Preconditions.checkNotNull(config().get(PERIOD), "The period must be configured for this policy"); + String time = config().get(TIME); + Duration wait = config().get(WAIT); + if (time != null) { + wait = getWaitUntil(time); + } else if (wait == null) { + wait = period; + } + + schedule(wait); + } + + @Override + public void rebind() { + super.rebind(); + + // Check if we missed an entire period + List scheduled = config().get(SCHEDULED); + if (running.get() && scheduled.isEmpty()) { + start(); + } + } + + @Override + public synchronized void run() { + try { + super.run(); + } finally { + Duration period = config().get(PERIOD); + String time = config().get(TIME); + if (time == null || time.equalsIgnoreCase(NOW) || time.equalsIgnoreCase(IMMEDIATELY)) { + schedule(period); + } else { + Duration wait = getWaitUntil(time); + schedule(wait.upperBound(period)); + } + } + } +} diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java new file mode 100644 index 0000000000..5a3bed7db8 --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.action; + +import java.util.Date; + +import org.apache.brooklyn.api.effector.Effector; +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.policy.Policy; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.core.sensor.Sensors; +import org.apache.brooklyn.util.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.Beta; + +/** + * A {@link Policy} the executes an {@link Effector} at a specific time in the future. + *

+ *

{@code
+ * brooklyn.policies:
+ *   - type: org.apache.brooklyn.policy.action.ScheduledEffectorPolicy
+ *     brooklyn.config:
+ *       effector: update
+ *       time: 12:00:00
+ * }
+ */ +@Beta +public class ScheduledEffectorPolicy extends AbstractScheduledEffectorPolicy { + + private static final Logger LOG = LoggerFactory.getLogger(ScheduledEffectorPolicy.class); + + public static final AttributeSensor INVOKE_IMMEDIATELY = Sensors.newBooleanSensor("scheduler.invoke.now", "Invoke the configured effector immediately when this becomes true"); + public static final AttributeSensor INVOKE_AT = Sensors.newSensor(Date.class, "scheduler.invoke.at", "Invoke the configured effector at this time"); + + public ScheduledEffectorPolicy() { + super(); + } + + @Override + public void setEntity(final EntityLocal entity) { + super.setEntity(entity); + + subscriptions().subscribe(entity, INVOKE_IMMEDIATELY, this); + subscriptions().subscribe(entity, INVOKE_AT, this); + } + + @Override + public void start() { + String time = config().get(TIME); + Duration wait = config().get(WAIT); + + if (time != null) { + wait = getWaitUntil(time); + } + + if (wait != null) { + schedule(wait); + } + } + + @Override + public void onEvent(SensorEvent event) { + super.onEvent(event); + + if (running.get()) { + if (event.getSensor().getName().equals(INVOKE_AT.getName())) { + String time = (String) event.getValue(); + if (time != null) { + schedule(getWaitUntil(time)); + } + } + if (event.getSensor().getName().equals(INVOKE_IMMEDIATELY.getName())) { + Boolean invoke = (Boolean) event.getValue(); + if (invoke) { + schedule(Duration.ZERO); + } + } + } + } +} diff --git a/policy/src/main/resources/catalog.bom b/policy/src/main/resources/catalog.bom index d92b83634d..9c9e323fad 100644 --- a/policy/src/main/resources/catalog.bom +++ b/policy/src/main/resources/catalog.bom @@ -49,6 +49,21 @@ brooklyn.catalog: Policy that is attached to a Resizable entity and dynamically adjusts its size in response to either keep a metric within a given range, or in response to POOL_COLD and POOL_HOT events + - id: org.apache.brooklyn.policy.action.PeriodicEffectorPolicy + itemType: policy + item: + type: org.apache.brooklyn.policy.action.PeriodicEffectorPolicy + name: Periodic Effector Execution + description: | + Policy that executes an effector repeatedly at configurable intervals. + - id: org.apache.brooklyn.policy.action.ScheduledEffectorPolicy + itemType: policy + item: + type: org.apache.brooklyn.policy.action.ScheduledEffectorPolicy + name: Scheduled Effector Execution + description: | + Policy that executes an effector at a configurable time or after + a configurable delay. # Removed from catalog because 'FollowTheSunPool' cannot currently be configured via catalog mechanisms. # Also removing associated 'BalanceableWorkerPool' etc as they are only useful with 'FollowTheSunPool' diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java new file mode 100644 index 0000000000..0268d60a23 --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.brooklyn.policy.action; + +import org.apache.brooklyn.api.entity.EntitySpec; +import org.apache.brooklyn.api.policy.Policy; +import org.apache.brooklyn.api.policy.PolicySpec; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.core.sensor.Sensors; +import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; +import org.apache.brooklyn.core.test.entity.TestEntity; +import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.time.Duration; +import org.testng.annotations.Test; + +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; + +public class PeriodicEffectorPolicyTest extends BrooklynAppUnitTestSupport { + + private static final AttributeSensor START = Sensors.newBooleanSensor("start"); + + @Test + public void testPeriodicEffectorFires() { + TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) + .policy(PolicySpec.create(PeriodicEffectorPolicy.class) + .configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector") + .configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of()) + .configure(PeriodicEffectorPolicy.PERIOD, Duration.ONE_MILLISECOND) + .configure(PeriodicEffectorPolicy.TIME, "immediately") + .configure(PeriodicEffectorPolicy.START_SENSOR, START))); + Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull(); + Asserts.assertNotNull(policy); + + Asserts.assertTrue(entity.getCallHistory().isEmpty()); + Asserts.assertFalse(policy.config().get(PeriodicEffectorPolicy.RUNNING)); + + entity.sensors().set(START, Boolean.TRUE); + Asserts.eventually(() -> policy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b); + Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector")); + int calls = entity.getCallHistory().size(); + Asserts.eventually(() -> entity.getCallHistory().size(), i -> i > (calls + 500)); + } + + @Test + public void testPeriodicEffectorFiresAfterDelay() { + TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) + .policy(PolicySpec.create(PeriodicEffectorPolicy.class) + .configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector") + .configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of()) + .configure(PeriodicEffectorPolicy.PERIOD, Duration.ONE_MILLISECOND) + .configure(PeriodicEffectorPolicy.WAIT, Duration.TEN_SECONDS) + .configure(PeriodicEffectorPolicy.START_SENSOR, START))); + Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull(); + Asserts.assertNotNull(policy); + + Asserts.assertTrue(entity.getCallHistory().isEmpty()); + Asserts.assertFalse(policy.config().get(PeriodicEffectorPolicy.RUNNING)); + + entity.sensors().set(START, Boolean.TRUE); + Asserts.eventually(() -> policy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b); + sleep(Duration.seconds(5)); + Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector")); + sleep(Duration.seconds(5)); + Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector")); + int calls = entity.getCallHistory().size(); + Asserts.eventually(() -> entity.getCallHistory().size(), i -> i > (calls + 500)); + } + + private void sleep(Duration duration) { + try { + Thread.sleep(duration.toMilliseconds()); + } catch (InterruptedException ie) { + Exceptions.propagate(ie); + } + } +} diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java new file mode 100644 index 0000000000..5271de3188 --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.brooklyn.policy.action; + +import org.apache.brooklyn.api.entity.EntitySpec; +import org.apache.brooklyn.api.policy.Policy; +import org.apache.brooklyn.api.policy.PolicySpec; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.core.sensor.Sensors; +import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; +import org.apache.brooklyn.core.test.entity.TestEntity; +import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.time.Duration; +import org.testng.annotations.Test; + +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; + +public class ScheduledEffectorPolicyTest extends BrooklynAppUnitTestSupport { + + private static final AttributeSensor START = Sensors.newBooleanSensor("start"); + + @Test + public void testScheduledEffectorFiresImmediately() { + TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) + .policy(PolicySpec.create(ScheduledEffectorPolicy.class) + .configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector") + .configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of()) + .configure(ScheduledEffectorPolicy.TIME, "immediately") + .configure(PeriodicEffectorPolicy.START_SENSOR, START))); + Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull(); + Asserts.assertNotNull(policy); + + Asserts.assertTrue(entity.getCallHistory().isEmpty()); + Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING)); + + entity.sensors().set(START, Boolean.TRUE); + Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b); + Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector")); + } + + @Test + public void testScheduledEffectorFiresAfterDelay() { + TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) + .policy(PolicySpec.create(ScheduledEffectorPolicy.class) + .configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector") + .configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of()) + .configure(ScheduledEffectorPolicy.WAIT, Duration.TEN_SECONDS) + .configure(ScheduledEffectorPolicy.START_SENSOR, START))); + Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull(); + Asserts.assertNotNull(policy); + + Asserts.assertTrue(entity.getCallHistory().isEmpty()); + Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING)); + + entity.sensors().set(START, Boolean.TRUE); + Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b); + sleep(Duration.seconds(5)); + Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector")); + sleep(Duration.seconds(5)); + Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector")); + } + + @Test + public void testScheduledEffectorFiresOnSensor() { + TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) + .policy(PolicySpec.create(ScheduledEffectorPolicy.class) + .configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector") + .configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of()) + .configure(ScheduledEffectorPolicy.START_SENSOR, START))); + Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull(); + Asserts.assertNotNull(policy); + + Asserts.assertTrue(entity.getCallHistory().isEmpty()); + Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING)); + + entity.sensors().set(START, Boolean.TRUE); + Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b); + sleep(Duration.seconds(5)); + Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector")); + + entity.sensors().set(ScheduledEffectorPolicy.INVOKE_IMMEDIATELY, Boolean.TRUE); + Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector")); + } + + private void sleep(Duration duration) { + try { + Thread.sleep(duration.toMilliseconds()); + } catch (InterruptedException ie) { + Exceptions.propagate(ie); + } + } +}