diff --git a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java index 2480c9689..50170b9fa 100644 --- a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java +++ b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java @@ -117,6 +117,7 @@ public void onRemove(Entity entity) throws FalconException { } Process process = (Process) entity; if (process.getSla() != null) { + LOG.debug("Removing process:{} from monitoring", process.getName()); backlogMetricStore.deleteEntityBackLogInstances(entity.getName(), entity.getEntityType().name()); entityBacklogs.remove(entity); process = EntityUtil.getEntity(entity.getEntityType(), entity.getName()); diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java index 837a1701b..2f19e6bbd 100644 --- a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java +++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java @@ -112,7 +112,7 @@ void processSLACandidates(){ if (pendingInstanceBeanList == null || pendingInstanceBeanList.isEmpty()){ return; } - LOG.trace("In processSLACandidates :" + pendingInstanceBeanList.size()); + LOG.debug("In processSLACandidates :" + pendingInstanceBeanList.size()); try{ for (PendingInstanceBean pendingInstanceBean : pendingInstanceBeanList) { @@ -129,7 +129,7 @@ void processSLACandidates(){ if (schedulableEntityInstances.isEmpty()){ store.deleteEntityAlertInstance(entityName, cluster.getName(), nominalTime, entityType); - return; + continue; } List schedulableEntityList = new ArrayList<>(schedulableEntityInstances); SchedulableEntityInstance schedulableEntityInstance = schedulableEntityList.get(0); diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java index 09671d929..d16a19e64 100644 --- a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java @@ -55,6 +55,7 @@ import org.apache.falcon.security.CurrentUser; import org.apache.falcon.util.DateUtil; import org.apache.falcon.util.DeploymentUtil; +import org.apache.falcon.util.RuntimeProperties; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.WorkflowEngineFactory; import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; @@ -207,6 +208,7 @@ public void onRemove(Entity entity) throws FalconException { if (feed.getSla() != null && feed.getLocations() != null) { for (Cluster cluster : feed.getClusters().getClusters()) { if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) { + LOG.debug("Removing feed:{} for monitoring", feed.getName()); MONITORING_JDBC_STATE_STORE.deleteMonitoringEntity(feed.getName(), EntityType.FEED.toString()); MONITORING_JDBC_STATE_STORE.deletePendingInstances(feed.getName(), cluster.getName(), EntityType.FEED.toString()); @@ -218,6 +220,7 @@ public void onRemove(Entity entity) throws FalconException { Process process = (Process) entity; if (process.getSla() != null){ for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) { + LOG.debug("Removing process:{} for monitoring", process.getName()); if (currentClusters.contains(cluster.getName())) { MONITORING_JDBC_STATE_STORE.deleteMonitoringEntity(process.getName(), EntityType.PROCESS.toString()); @@ -316,6 +319,7 @@ public void init() throws FalconException { freq = StartupProperties.get().getProperty("entity.sla.lookAheadWindow.millis", "900000"); lookAheadWindowMillis = Integer.parseInt(freq); + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); addPendingEntityInstances(now()); executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds, TimeUnit.SECONDS); @@ -364,6 +368,8 @@ public void run() { // add Instances from last checked time to 10 minutes from now(some buffer for status check) Date newCheckPointTime = new Date(now().getTime() + lookAheadWindowMillis); addPendingEntityInstances(newCheckPointTime); + } else { + LOG.debug("No entities present for sla monitoring."); } } catch (Throwable e) { LOG.error("Feed SLA monitoring failed: ", e); @@ -449,8 +455,13 @@ private boolean checkEntityInstanceAvailability(String entityName, String cluste if (entityType.equalsIgnoreCase(EntityType.PROCESS.toString())){ LOG.trace("Checking instance availability status for entity:{}, cluster:{}, " + "instanceTime:{}", entity.getName(), clusterName, nominalTime, entityType); - AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine(); + if ((System.currentTimeMillis() - nominalTime.getTime())/(1000*60*60*24) >= Integer.parseInt( + RuntimeProperties.get().getProperty("worklflow.expiration.period", "7"))) { + return true; + } + + AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine(); InstancesResult instancesResult = wfEngine.getStatus(entity, nominalTime, new Date(nominalTime.getTime() + 200), PROCESS_LIFE_CYCLE, false); if (instancesResult.getInstances().length > 0) { diff --git a/src/conf/runtime.properties b/src/conf/runtime.properties index 013ac1881..243d295b3 100644 --- a/src/conf/runtime.properties +++ b/src/conf/runtime.properties @@ -86,3 +86,6 @@ falcon.current.colo=local ### Timeout factor for processes ### instance.timeout.factor=5 + +### Workflow expiration period for oozie ### +worklflow.expiration.period=7