From 576bf2ae86253050d39b29f1ffd3cf55d7309385 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Thu, 2 Apr 2020 19:10:41 -1000 Subject: [PATCH 01/11] Fix NPE in RemoteTaskRunner event handler causes JVM shutdown --- .../indexing/overlord/RemoteTaskRunner.java | 220 +++++++++--------- .../overlord/RemoteTaskRunnerTest.java | 47 ++++ licenses.yaml | 2 +- pom.xml | 2 +- 4 files changed, 165 insertions(+), 106 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index fe6e8be17677..f6f36e968216 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -45,6 +45,7 @@ import org.apache.commons.lang.mutable.MutableInt; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.ZKPaths; import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.curator.CuratorUtils; @@ -969,110 +970,7 @@ private ListenableFuture addWorker(final Worker worker) ); // Add status listener to the watcher for status changes - zkWorker.addListener( - (client, event) -> { - final String taskId; - final RemoteTaskRunnerWorkItem taskRunnerWorkItem; - synchronized (statusLock) { - try { - switch (event.getType()) { - case CHILD_ADDED: - case CHILD_UPDATED: - taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); - final TaskAnnouncement announcement = jsonMapper.readValue( - event.getData().getData(), TaskAnnouncement.class - ); - - log.info( - "Worker[%s] wrote %s status for task [%s] on [%s]", - zkWorker.getWorker().getHost(), - announcement.getTaskStatus().getStatusCode(), - taskId, - announcement.getTaskLocation() - ); - - // Synchronizing state with ZK - statusLock.notifyAll(); - - final RemoteTaskRunnerWorkItem tmp; - if ((tmp = runningTasks.get(taskId)) != null) { - taskRunnerWorkItem = tmp; - } else { - final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem( - taskId, - announcement.getTaskType(), - zkWorker.getWorker(), - TaskLocation.unknown(), - announcement.getTaskDataSource() - ); - final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent( - taskId, - newTaskRunnerWorkItem - ); - if (existingItem == null) { - log.warn( - "Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s", - zkWorker.getWorker().getHost(), - taskId - ); - taskRunnerWorkItem = newTaskRunnerWorkItem; - } else { - taskRunnerWorkItem = existingItem; - } - } - - if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) { - taskRunnerWorkItem.setLocation(announcement.getTaskLocation()); - TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation()); - } - - if (announcement.getTaskStatus().isComplete()) { - taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus()); - runPendingTasks(); - } - break; - case CHILD_REMOVED: - taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); - taskRunnerWorkItem = runningTasks.remove(taskId); - if (taskRunnerWorkItem != null) { - log.info("Task[%s] just disappeared!", taskId); - taskRunnerWorkItem.setResult(TaskStatus.failure(taskId)); - TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId)); - } else { - log.info("Task[%s] went bye bye.", taskId); - } - break; - case INITIALIZED: - if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) { - retVal.set(zkWorker); - } else { - final String message = StringUtils.format( - "WTF?! Tried to add already-existing worker[%s]", - worker.getHost() - ); - log.makeAlert(message) - .addData("workerHost", worker.getHost()) - .addData("workerIp", worker.getIp()) - .emit(); - retVal.setException(new IllegalStateException(message)); - } - runPendingTasks(); - break; - case CONNECTION_SUSPENDED: - case CONNECTION_RECONNECTED: - case CONNECTION_LOST: - // do nothing - } - } - catch (Exception e) { - log.makeAlert(e, "Failed to handle new worker status") - .addData("worker", zkWorker.getWorker().getHost()) - .addData("znode", event.getData().getPath()) - .emit(); - } - } - } - ); + zkWorker.addListener(getStatusListener(worker, zkWorker, retVal)); zkWorker.start(); return retVal; } @@ -1081,6 +979,120 @@ private ListenableFuture addWorker(final Worker worker) } } + @VisibleForTesting + PathChildrenCacheListener getStatusListener(final Worker worker, final ZkWorker zkWorker, final SettableFuture retVal) { + return (client, event) -> { + final String taskId; + final RemoteTaskRunnerWorkItem taskRunnerWorkItem; + synchronized (statusLock) { + try { + switch (event.getType()) { + case CHILD_ADDED: + case CHILD_UPDATED: + taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); + final TaskAnnouncement announcement = jsonMapper.readValue( + event.getData().getData(), TaskAnnouncement.class + ); + + log.info( + "Worker[%s] wrote %s status for task [%s] on [%s]", + zkWorker.getWorker().getHost(), + announcement.getTaskStatus().getStatusCode(), + taskId, + announcement.getTaskLocation() + ); + + // Synchronizing state with ZK + statusLock.notifyAll(); + + final RemoteTaskRunnerWorkItem tmp; + if ((tmp = runningTasks.get(taskId)) != null) { + taskRunnerWorkItem = tmp; + } else { + final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem( + taskId, + announcement.getTaskType(), + zkWorker.getWorker(), + TaskLocation.unknown(), + announcement.getTaskDataSource() + ); + final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent( + taskId, + newTaskRunnerWorkItem + ); + if (existingItem == null) { + log.warn( + "Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s", + zkWorker.getWorker().getHost(), + taskId + ); + taskRunnerWorkItem = newTaskRunnerWorkItem; + } else { + taskRunnerWorkItem = existingItem; + } + } + + if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) { + taskRunnerWorkItem.setLocation(announcement.getTaskLocation()); + TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation()); + } + + if (announcement.getTaskStatus().isComplete()) { + taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus()); + runPendingTasks(); + } + break; + case CHILD_REMOVED: + taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); + taskRunnerWorkItem = runningTasks.remove(taskId); + if (taskRunnerWorkItem != null) { + log.info("Task[%s] just disappeared!", taskId); + taskRunnerWorkItem.setResult(TaskStatus.failure(taskId)); + TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId)); + } else { + log.info("Task[%s] went bye bye.", taskId); + } + break; + case INITIALIZED: + if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) { + retVal.set(zkWorker); + } else { + final String message = StringUtils.format( + "WTF?! Tried to add already-existing worker[%s]", + worker.getHost() + ); + log.makeAlert(message) + .addData("workerHost", worker.getHost()) + .addData("workerIp", worker.getIp()) + .emit(); + retVal.setException(new IllegalStateException(message)); + } + runPendingTasks(); + break; + case CONNECTION_SUSPENDED: + case CONNECTION_RECONNECTED: + case CONNECTION_LOST: + // do nothing + } + } + catch (Exception e) { + String workerHost = null; + String znode = null; + if (zkWorker != null && zkWorker.getWorker() != null) { + workerHost = zkWorker.getWorker().getHost(); + } + if (event != null && event.getData() != null) { + znode = event.getData().getPath(); + } + log.makeAlert(e, "Failed to handle new worker status") + .addData("worker", workerHost) + .addData("znode", znode) + .emit(); + } + } + }; + } + /** * We allow workers to change their own capacities and versions. They cannot change their own hosts or ips without * dropping themselves and re-announcing. diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java index 060164887b92..e7b2ead6a1ec 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -28,7 +28,13 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.TestingCluster; +import org.apache.curator.test.Timing; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.IndexingServiceCondition; @@ -39,11 +45,15 @@ import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import org.apache.druid.indexing.worker.Worker; +import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; import org.apache.druid.testing.DeadlockDetectingTimeout; +import org.easymock.Capture; +import org.easymock.CaptureType; import org.easymock.EasyMock; import org.joda.time.Period; import org.junit.After; @@ -55,10 +65,16 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Map; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.newCapture; + public class RemoteTaskRunnerTest { private static final Joiner JOINER = RemoteTaskRunnerTestUtils.JOINER; @@ -944,4 +960,35 @@ public void testSuccessfulTaskOnBlacklistedWorker() throws Exception Assert.assertTrue(taskFuture2.get().isSuccess()); Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size()); } + + @Test + public void testStatusListenerEventDataNullShouldNotThrowException() throws Exception + { + // Set up mock emitter to verify log alert when exception is thrown inside the status listener + ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); + Capture capturedArgument = Capture.newInstance(); + emitter.emit(capture(capturedArgument)); + expectLastCall().atLeastOnce(); + EmittingLogger.registerEmitter(emitter); + EasyMock.replay(emitter); + + PathChildrenCache cache = new PathChildrenCache(cf, "/test", true); + testStartWithNoWorker(); + cache.getListenable().addListener(remoteTaskRunner.getStatusListener(worker, null, SettableFuture.create())); + cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); + + // Status listener will recieve event with null data + Assert.assertTrue( + TestUtils.conditionValid(() -> cache.getCurrentData().size() == 1) + ); + + // Verify that the log emitter was called + EasyMock.verify(emitter); + Map alertDataMap = capturedArgument.getValue().build(null).getDataMap(); + Assert.assertTrue(alertDataMap.containsKey("worker")); + Assert.assertTrue(alertDataMap.containsKey("znode")); + Assert.assertNull(alertDataMap.get("worker")); + Assert.assertNull(alertDataMap.get("znode")); + // Status listener should successfully completes without throwing exception + } } diff --git a/licenses.yaml b/licenses.yaml index 7af752619474..2e9ce9ddc5f3 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -1304,7 +1304,7 @@ name: Apache Curator license_category: binary module: java-core license_name: Apache License version 2.0 -version: 4.1.0 +version: 4.2.0 libraries: - org.apache.curator: curator-client - org.apache.curator: curator-framework diff --git a/pom.xml b/pom.xml index 6d28e2f42658..66bd483deb47 100644 --- a/pom.xml +++ b/pom.xml @@ -76,7 +76,7 @@ 8 UTF-8 0.9.0.M2 - 4.1.0 + 4.2.0 2.12.0 2.2.2 1.15.0 From 82c979e994e74bf2add2b0de537c101f9fc7f844 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 3 Apr 2020 10:32:44 -1000 Subject: [PATCH 02/11] address comments --- licenses.yaml | 2 +- pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/licenses.yaml b/licenses.yaml index 2e9ce9ddc5f3..f3a911d224c8 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -1304,7 +1304,7 @@ name: Apache Curator license_category: binary module: java-core license_name: Apache License version 2.0 -version: 4.2.0 +version: 4.3.0 libraries: - org.apache.curator: curator-client - org.apache.curator: curator-framework diff --git a/pom.xml b/pom.xml index 66bd483deb47..391ac8e293bc 100644 --- a/pom.xml +++ b/pom.xml @@ -76,7 +76,7 @@ 8 UTF-8 0.9.0.M2 - 4.2.0 + 4.3.0 2.12.0 2.2.2 1.15.0 From 51106421b12a43b7f27bc2db0c033997378c85cc Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 3 Apr 2020 10:46:11 -1000 Subject: [PATCH 03/11] fix compile --- .../druid/curator/discovery/DiscoveryModule.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java b/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java index fd899640c67f..78c56691967d 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java @@ -482,6 +482,18 @@ public ServiceProviderBuilder additionalFilter(InstanceFilter tInstanceFil { return this; } + + @Override + public ServiceProviderBuilder executorService(ExecutorService executorService) + { + return this; + } + + @Override + public ServiceProviderBuilder executorService(CloseableExecutorService closeableExecutorService) + { + return this; + } } private static class NoopServiceProvider implements ServiceProvider From 376b9b243b37dc9dd65d6350cb8ee7493b11ec47 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 3 Apr 2020 13:31:52 -1000 Subject: [PATCH 04/11] fix checkstyle --- .../indexing/overlord/RemoteTaskRunner.java | 5 +++-- .../indexing/overlord/RemoteTaskRunnerTest.java | 16 ++-------------- 2 files changed, 5 insertions(+), 16 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index f6f36e968216..269cb0dec275 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -980,7 +980,8 @@ private ListenableFuture addWorker(final Worker worker) } @VisibleForTesting - PathChildrenCacheListener getStatusListener(final Worker worker, final ZkWorker zkWorker, final SettableFuture retVal) { + PathChildrenCacheListener getStatusListener(final Worker worker, final ZkWorker zkWorker, final SettableFuture retVal) + { return (client, event) -> { final String taskId; final RemoteTaskRunnerWorkItem taskRunnerWorkItem; @@ -1058,7 +1059,7 @@ PathChildrenCacheListener getStatusListener(final Worker worker, final ZkWorker retVal.set(zkWorker); } else { final String message = StringUtils.format( - "WTF?! Tried to add already-existing worker[%s]", + "This should not happen...tried to add already-existing worker[%s]", worker.getHost() ); log.makeAlert(message) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java index e7b2ead6a1ec..108278f084fb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -30,11 +30,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.PathChildrenCache; -import org.apache.curator.retry.RetryOneTime; -import org.apache.curator.test.TestingCluster; -import org.apache.curator.test.Timing; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.IndexingServiceCondition; @@ -45,15 +41,12 @@ import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import org.apache.druid.indexing.worker.Worker; -import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; import org.apache.druid.testing.DeadlockDetectingTimeout; import org.easymock.Capture; -import org.easymock.CaptureType; import org.easymock.EasyMock; import org.joda.time.Period; import org.junit.After; @@ -70,11 +63,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.easymock.EasyMock.newCapture; - public class RemoteTaskRunnerTest { private static final Joiner JOINER = RemoteTaskRunnerTestUtils.JOINER; @@ -967,8 +955,8 @@ public void testStatusListenerEventDataNullShouldNotThrowException() throws Exce // Set up mock emitter to verify log alert when exception is thrown inside the status listener ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); Capture capturedArgument = Capture.newInstance(); - emitter.emit(capture(capturedArgument)); - expectLastCall().atLeastOnce(); + emitter.emit(EasyMock.capture(capturedArgument)); + EasyMock.expectLastCall().atLeastOnce(); EmittingLogger.registerEmitter(emitter); EasyMock.replay(emitter); From 88c5286e29ddff853e08a65c370a29ad0d8f3d12 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 3 Apr 2020 13:54:34 -1000 Subject: [PATCH 05/11] fix lgtm --- extensions-contrib/ambari-metrics-emitter/pom.xml | 10 ++++++++++ .../druid/indexing/overlord/RemoteTaskRunner.java | 14 +++++--------- pom.xml | 2 +- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/extensions-contrib/ambari-metrics-emitter/pom.xml b/extensions-contrib/ambari-metrics-emitter/pom.xml index 7cd69ea7a67d..7718c882fd13 100644 --- a/extensions-contrib/ambari-metrics-emitter/pom.xml +++ b/extensions-contrib/ambari-metrics-emitter/pom.xml @@ -48,6 +48,16 @@ test-jar test + + org.codehaus.jackson + jackson-core-asl + ${codehaus.jackson.version} + + + org.codehaus.jackson + jackson-mapper-asl + ${codehaus.jackson.version} + org.apache.ambari ambari-metrics-common diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index 269cb0dec275..d676c639fd09 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -987,12 +987,12 @@ PathChildrenCacheListener getStatusListener(final Worker worker, final ZkWorker final RemoteTaskRunnerWorkItem taskRunnerWorkItem; synchronized (statusLock) { try { - switch (event.getType()) { + switch (event.getType()) { // lgtm [java/dereferenced-value-may-be-null] case CHILD_ADDED: case CHILD_UPDATED: - taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); + taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); // lgtm [java/dereferenced-value-may-be-null] final TaskAnnouncement announcement = jsonMapper.readValue( - event.getData().getData(), TaskAnnouncement.class + event.getData().getData(), TaskAnnouncement.class // lgtm [java/dereferenced-value-may-be-null] ); log.info( @@ -1044,7 +1044,7 @@ PathChildrenCacheListener getStatusListener(final Worker worker, final ZkWorker } break; case CHILD_REMOVED: - taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); + taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); // lgtm [java/dereferenced-value-may-be-null] taskRunnerWorkItem = runningTasks.remove(taskId); if (taskRunnerWorkItem != null) { log.info("Task[%s] just disappeared!", taskId); @@ -1077,16 +1077,12 @@ PathChildrenCacheListener getStatusListener(final Worker worker, final ZkWorker } } catch (Exception e) { - String workerHost = null; String znode = null; - if (zkWorker != null && zkWorker.getWorker() != null) { - workerHost = zkWorker.getWorker().getHost(); - } if (event != null && event.getData() != null) { znode = event.getData().getPath(); } log.makeAlert(e, "Failed to handle new worker status") - .addData("worker", workerHost) + .addData("worker", zkWorker.getWorker().getHost()) .addData("znode", znode) .emit(); } diff --git a/pom.xml b/pom.xml index 391ac8e293bc..6d28e2f42658 100644 --- a/pom.xml +++ b/pom.xml @@ -76,7 +76,7 @@ 8 UTF-8 0.9.0.M2 - 4.3.0 + 4.1.0 2.12.0 2.2.2 1.15.0 From 17da2c587ed1a1c61926c05740b81068d7d8cbae Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 3 Apr 2020 15:38:55 -1000 Subject: [PATCH 06/11] fix merge --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 6e43316e21d9..7d296e304ed4 100644 --- a/pom.xml +++ b/pom.xml @@ -76,7 +76,7 @@ 8 UTF-8 0.9.0.M2 - 4.1.0 + 4.3.0 2.12.0 2.2.2 1.15.0 From 868176850bab929a43a7e1f3a8de4c48047afda4 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 3 Apr 2020 15:54:25 -1000 Subject: [PATCH 07/11] fix test --- .../druid/indexing/overlord/RemoteTaskRunnerTest.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java index 108278f084fb..b23e0638c228 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -28,7 +28,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.druid.indexer.TaskState; @@ -953,6 +952,9 @@ public void testSuccessfulTaskOnBlacklistedWorker() throws Exception public void testStatusListenerEventDataNullShouldNotThrowException() throws Exception { // Set up mock emitter to verify log alert when exception is thrown inside the status listener + Worker worker = EasyMock.createMock(Worker.class); + EasyMock.expect(worker.getHost()).andReturn("host").atLeastOnce(); + EasyMock.replay(worker); ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); Capture capturedArgument = Capture.newInstance(); emitter.emit(EasyMock.capture(capturedArgument)); @@ -962,7 +964,7 @@ public void testStatusListenerEventDataNullShouldNotThrowException() throws Exce PathChildrenCache cache = new PathChildrenCache(cf, "/test", true); testStartWithNoWorker(); - cache.getListenable().addListener(remoteTaskRunner.getStatusListener(worker, null, SettableFuture.create())); + cache.getListenable().addListener(remoteTaskRunner.getStatusListener(worker, new ZkWorker(worker, cache, jsonMapper), null)); cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); // Status listener will recieve event with null data @@ -971,11 +973,10 @@ public void testStatusListenerEventDataNullShouldNotThrowException() throws Exce ); // Verify that the log emitter was called + EasyMock.verify(worker); EasyMock.verify(emitter); Map alertDataMap = capturedArgument.getValue().build(null).getDataMap(); - Assert.assertTrue(alertDataMap.containsKey("worker")); Assert.assertTrue(alertDataMap.containsKey("znode")); - Assert.assertNull(alertDataMap.get("worker")); Assert.assertNull(alertDataMap.get("znode")); // Status listener should successfully completes without throwing exception } From 258bfe476f5377789faec9bbf6ab9e1869110f99 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 3 Apr 2020 18:00:16 -1000 Subject: [PATCH 08/11] fix tests --- extensions-contrib/ambari-metrics-emitter/pom.xml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/extensions-contrib/ambari-metrics-emitter/pom.xml b/extensions-contrib/ambari-metrics-emitter/pom.xml index 457a6a198c53..7c74a8529e7e 100644 --- a/extensions-contrib/ambari-metrics-emitter/pom.xml +++ b/extensions-contrib/ambari-metrics-emitter/pom.xml @@ -52,11 +52,13 @@ org.codehaus.jackson jackson-core-asl ${codehaus.jackson.version} + runtime org.codehaus.jackson jackson-mapper-asl ${codehaus.jackson.version} + runtime org.apache.ambari From d6b76ba077c058107b655699004f435a9f8ba554 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 3 Apr 2020 21:29:12 -1000 Subject: [PATCH 09/11] change scope --- extensions-contrib/ambari-metrics-emitter/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-contrib/ambari-metrics-emitter/pom.xml b/extensions-contrib/ambari-metrics-emitter/pom.xml index 7c74a8529e7e..053240eb1782 100644 --- a/extensions-contrib/ambari-metrics-emitter/pom.xml +++ b/extensions-contrib/ambari-metrics-emitter/pom.xml @@ -52,13 +52,13 @@ org.codehaus.jackson jackson-core-asl ${codehaus.jackson.version} - runtime + test org.codehaus.jackson jackson-mapper-asl ${codehaus.jackson.version} - runtime + test org.apache.ambari From 0e62d3da2abb5d05a7bf89f304c8031a2c1a5163 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 3 Apr 2020 21:42:39 -1000 Subject: [PATCH 10/11] address comments --- .../apache/druid/indexing/overlord/RemoteTaskRunner.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index d676c639fd09..91ee09cc2b66 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -1078,12 +1078,17 @@ PathChildrenCacheListener getStatusListener(final Worker worker, final ZkWorker } catch (Exception e) { String znode = null; - if (event != null && event.getData() != null) { - znode = event.getData().getPath(); + String eventType = null; + if (event != null) { + if (event.getData() != null) { + znode = event.getData().getPath(); + } + eventType = event.getType().toString(); } log.makeAlert(e, "Failed to handle new worker status") .addData("worker", zkWorker.getWorker().getHost()) .addData("znode", znode) + .addData("eventType", eventType) .emit(); } } From fb9ced297bdf033e5e235b26dbcc874501d13f75 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 6 Apr 2020 21:41:35 -1000 Subject: [PATCH 11/11] address comments --- .../ambari-metrics-emitter/pom.xml | 24 ++++++------- .../indexing/overlord/RemoteTaskRunner.java | 34 +++++++++++++------ 2 files changed, 35 insertions(+), 23 deletions(-) diff --git a/extensions-contrib/ambari-metrics-emitter/pom.xml b/extensions-contrib/ambari-metrics-emitter/pom.xml index 053240eb1782..aadf49d948e9 100644 --- a/extensions-contrib/ambari-metrics-emitter/pom.xml +++ b/extensions-contrib/ambari-metrics-emitter/pom.xml @@ -48,18 +48,6 @@ test-jar test - - org.codehaus.jackson - jackson-core-asl - ${codehaus.jackson.version} - test - - - org.codehaus.jackson - jackson-mapper-asl - ${codehaus.jackson.version} - test - org.apache.ambari ambari-metrics-common @@ -143,6 +131,18 @@ JUnitParams test + + org.codehaus.jackson + jackson-core-asl + ${codehaus.jackson.version} + test + + + org.codehaus.jackson + jackson-mapper-asl + ${codehaus.jackson.version} + test + diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index 91ee09cc2b66..dbaadf98e9d9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -987,12 +987,20 @@ PathChildrenCacheListener getStatusListener(final Worker worker, final ZkWorker final RemoteTaskRunnerWorkItem taskRunnerWorkItem; synchronized (statusLock) { try { - switch (event.getType()) { // lgtm [java/dereferenced-value-may-be-null] + switch (event.getType()) { case CHILD_ADDED: case CHILD_UPDATED: - taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); // lgtm [java/dereferenced-value-may-be-null] + if (event.getData() == null) { + log.error("Unexpected null for event.getData() in handle new worker status for [%s]", event.getType().toString()); + log.makeAlert("Unexpected null for event.getData() in handle new worker status") + .addData("worker", zkWorker.getWorker().getHost()) + .addData("eventType", event.getType().toString()) + .emit(); + return; + } + taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); final TaskAnnouncement announcement = jsonMapper.readValue( - event.getData().getData(), TaskAnnouncement.class // lgtm [java/dereferenced-value-may-be-null] + event.getData().getData(), TaskAnnouncement.class ); log.info( @@ -1044,7 +1052,15 @@ PathChildrenCacheListener getStatusListener(final Worker worker, final ZkWorker } break; case CHILD_REMOVED: - taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); // lgtm [java/dereferenced-value-may-be-null] + if (event.getData() == null) { + log.error("Unexpected null for event.getData() in handle new worker status for [%s]", event.getType().toString()); + log.makeAlert("Unexpected null for event.getData() in handle new worker status") + .addData("worker", zkWorker.getWorker().getHost()) + .addData("eventType", event.getType().toString()) + .emit(); + return; + } + taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); taskRunnerWorkItem = runningTasks.remove(taskId); if (taskRunnerWorkItem != null) { log.info("Task[%s] just disappeared!", taskId); @@ -1078,17 +1094,13 @@ PathChildrenCacheListener getStatusListener(final Worker worker, final ZkWorker } catch (Exception e) { String znode = null; - String eventType = null; - if (event != null) { - if (event.getData() != null) { - znode = event.getData().getPath(); - } - eventType = event.getType().toString(); + if (event.getData() != null) { + znode = event.getData().getPath(); } log.makeAlert(e, "Failed to handle new worker status") .addData("worker", zkWorker.getWorker().getHost()) .addData("znode", znode) - .addData("eventType", eventType) + .addData("eventType", event.getType().toString()) .emit(); } }