From 622cae425e924db4bc5abeef09d8a184ca3a54c7 Mon Sep 17 00:00:00 2001 From: Pracheer Agarwal Date: Tue, 20 Dec 2016 10:19:20 +0530 Subject: [PATCH 1/7] FALCON-2225 extension owner added for trusted extensions --- .../java/org/apache/falcon/extensions/store/ExtensionStore.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java index df63779f6..b59741dab 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java @@ -104,7 +104,7 @@ private void initializeDbTable() { String description = getShortDescription(extension); String recipeName = extension; String location = storePath.toString() + '/' + extension; - String extensionOwner = CurrentUser.getUser(); + String extensionOwner = System.getProperty("user.name"); metaStore.storeExtensionBean(recipeName, location, extensionType, description, extensionOwner); } } catch (FalconException e) { From daa3ffc625e15dcc8a1243ab26f99b6c730fe9e2 Mon Sep 17 00:00:00 2001 From: Pracheer Agarwal Date: Tue, 20 Dec 2016 10:31:51 +0530 Subject: [PATCH 2/7] FALCON-2225 extension owner added for trusted extensions --- .../java/org/apache/falcon/extensions/store/ExtensionStore.java | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java index b59741dab..72d449339 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java @@ -26,7 +26,6 @@ import org.apache.falcon.extensions.ExtensionType; import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; import org.apache.falcon.hadoop.HadoopClientFactory; -import org.apache.falcon.security.CurrentUser; import org.apache.falcon.util.StartupProperties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; From fdc9b8299a7f64ee259301adc8799cc558bd43d8 Mon Sep 17 00:00:00 2001 From: Pracheer Agarwal Date: Wed, 2 Aug 2017 10:52:57 +0530 Subject: [PATCH 3/7] FALCON-795 logging statements added --- .../apache/falcon/service/BacklogMetricEmitterService.java | 1 + .../java/org/apache/falcon/service/EntitySLAAlertService.java | 4 ++-- .../org/apache/falcon/service/EntitySLAMonitoringService.java | 4 ++++ 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java index 2480c9689..50170b9fa 100644 --- a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java +++ b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java @@ -117,6 +117,7 @@ public void onRemove(Entity entity) throws FalconException { } Process process = (Process) entity; if (process.getSla() != null) { + LOG.debug("Removing process:{} from monitoring", process.getName()); backlogMetricStore.deleteEntityBackLogInstances(entity.getName(), entity.getEntityType().name()); entityBacklogs.remove(entity); process = EntityUtil.getEntity(entity.getEntityType(), entity.getName()); diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java index 837a1701b..2f19e6bbd 100644 --- a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java +++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java @@ -112,7 +112,7 @@ void processSLACandidates(){ if (pendingInstanceBeanList == null || pendingInstanceBeanList.isEmpty()){ return; } - LOG.trace("In processSLACandidates :" + pendingInstanceBeanList.size()); + LOG.debug("In processSLACandidates :" + pendingInstanceBeanList.size()); try{ for (PendingInstanceBean pendingInstanceBean : pendingInstanceBeanList) { @@ -129,7 +129,7 @@ void processSLACandidates(){ if (schedulableEntityInstances.isEmpty()){ store.deleteEntityAlertInstance(entityName, cluster.getName(), nominalTime, entityType); - return; + continue; } List schedulableEntityList = new ArrayList<>(schedulableEntityInstances); SchedulableEntityInstance schedulableEntityInstance = schedulableEntityList.get(0); diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java index 09671d929..ec17464e2 100644 --- a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java @@ -207,6 +207,7 @@ public void onRemove(Entity entity) throws FalconException { if (feed.getSla() != null && feed.getLocations() != null) { for (Cluster cluster : feed.getClusters().getClusters()) { if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) { + LOG.debug("Removing feed:{} for monitoring", feed.getName()); MONITORING_JDBC_STATE_STORE.deleteMonitoringEntity(feed.getName(), EntityType.FEED.toString()); MONITORING_JDBC_STATE_STORE.deletePendingInstances(feed.getName(), cluster.getName(), EntityType.FEED.toString()); @@ -218,6 +219,7 @@ public void onRemove(Entity entity) throws FalconException { Process process = (Process) entity; if (process.getSla() != null){ for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) { + LOG.debug("Removing process:{} for monitoring", process.getName()); if (currentClusters.contains(cluster.getName())) { MONITORING_JDBC_STATE_STORE.deleteMonitoringEntity(process.getName(), EntityType.PROCESS.toString()); @@ -364,6 +366,8 @@ public void run() { // add Instances from last checked time to 10 minutes from now(some buffer for status check) Date newCheckPointTime = new Date(now().getTime() + lookAheadWindowMillis); addPendingEntityInstances(newCheckPointTime); + } else { + LOG.debug("No entities present for sla monitoring."); } } catch (Throwable e) { LOG.error("Feed SLA monitoring failed: ", e); From 73d47e36103ab1cf54711fefa624903ed7504021 Mon Sep 17 00:00:00 2001 From: Pracheer Agarwal Date: Thu, 3 Aug 2017 12:34:06 +0000 Subject: [PATCH 4/7] test --- .../apache/falcon/service/EntitySLAMonitoringService.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java index ec17464e2..cc202a781 100644 --- a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java @@ -62,6 +62,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.joda.time.Interval; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -318,6 +319,7 @@ public void init() throws FalconException { freq = StartupProperties.get().getProperty("entity.sla.lookAheadWindow.millis", "900000"); lookAheadWindowMillis = Integer.parseInt(freq); + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); addPendingEntityInstances(now()); executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds, TimeUnit.SECONDS); @@ -453,8 +455,10 @@ private boolean checkEntityInstanceAvailability(String entityName, String cluste if (entityType.equalsIgnoreCase(EntityType.PROCESS.toString())){ LOG.trace("Checking instance availability status for entity:{}, cluster:{}, " + "instanceTime:{}", entity.getName(), clusterName, nominalTime, entityType); - AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine(); + Interval interval = new Interval(nominalTime.getTime(), new Date().getTime()); + if(nominalTime.) + AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine(); InstancesResult instancesResult = wfEngine.getStatus(entity, nominalTime, new Date(nominalTime.getTime() + 200), PROCESS_LIFE_CYCLE, false); if (instancesResult.getInstances().length > 0) { From 1bf815f0daa3d201914ead4904b3397330192912 Mon Sep 17 00:00:00 2001 From: Pracheer Agarwal Date: Fri, 4 Aug 2017 08:45:59 +0530 Subject: [PATCH 5/7] FALCON-795 logging statements added --- .../apache/falcon/service/EntitySLAMonitoringService.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java index cc202a781..c545e526b 100644 --- a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java @@ -55,6 +55,7 @@ import org.apache.falcon.security.CurrentUser; import org.apache.falcon.util.DateUtil; import org.apache.falcon.util.DeploymentUtil; +import org.apache.falcon.util.RuntimeProperties; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.WorkflowEngineFactory; import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; @@ -455,8 +456,11 @@ private boolean checkEntityInstanceAvailability(String entityName, String cluste if (entityType.equalsIgnoreCase(EntityType.PROCESS.toString())){ LOG.trace("Checking instance availability status for entity:{}, cluster:{}, " + "instanceTime:{}", entity.getName(), clusterName, nominalTime, entityType); - Interval interval = new Interval(nominalTime.getTime(), new Date().getTime()); - if(nominalTime.) + + if((System.currentTimeMillis() - nominalTime.getTime())/(1000*60*60*24) >= Integer.parseInt( + RuntimeProperties.get().getProperty("worklflow.expiration.period", "7"))) { + return true; + } AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine(); InstancesResult instancesResult = wfEngine.getStatus(entity, nominalTime, From afdf0bd31cd64ebf4837c5daa6bd603fe47cbca6 Mon Sep 17 00:00:00 2001 From: Pracheer Agarwal Date: Fri, 4 Aug 2017 06:36:28 +0000 Subject: [PATCH 6/7] Removing unused imports --- .../org/apache/falcon/service/EntitySLAMonitoringService.java | 1 - src/conf/runtime.properties | 3 +++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java index c545e526b..b708ab845 100644 --- a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java @@ -63,7 +63,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; -import org.joda.time.Interval; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/conf/runtime.properties b/src/conf/runtime.properties index 013ac1881..243d295b3 100644 --- a/src/conf/runtime.properties +++ b/src/conf/runtime.properties @@ -86,3 +86,6 @@ falcon.current.colo=local ### Timeout factor for processes ### instance.timeout.factor=5 + +### Workflow expiration period for oozie ### +worklflow.expiration.period=7 From b367f34c477ab0039941dd110647075f723e1cef Mon Sep 17 00:00:00 2001 From: Pracheer Agarwal Date: Fri, 4 Aug 2017 07:36:23 +0000 Subject: [PATCH 7/7] checkstyle errors --- .../org/apache/falcon/service/EntitySLAMonitoringService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java index b708ab845..d16a19e64 100644 --- a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java @@ -456,7 +456,7 @@ private boolean checkEntityInstanceAvailability(String entityName, String cluste LOG.trace("Checking instance availability status for entity:{}, cluster:{}, " + "instanceTime:{}", entity.getName(), clusterName, nominalTime, entityType); - if((System.currentTimeMillis() - nominalTime.getTime())/(1000*60*60*24) >= Integer.parseInt( + if ((System.currentTimeMillis() - nominalTime.getTime())/(1000*60*60*24) >= Integer.parseInt( RuntimeProperties.get().getProperty("worklflow.expiration.period", "7"))) { return true; }