Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions extensions-contrib/ambari-metrics-emitter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,18 @@
<artifactId>JUnitParams</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>${codehaus.jackson.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>${codehaus.jackson.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.curator.CuratorUtils;
Expand Down Expand Up @@ -969,116 +970,141 @@ private ListenableFuture<ZkWorker> addWorker(final Worker worker)
);

// Add status listener to the watcher for status changes
zkWorker.addListener(
(client, event) -> {
final String taskId;
final RemoteTaskRunnerWorkItem taskRunnerWorkItem;
synchronized (statusLock) {
try {
switch (event.getType()) {
case CHILD_ADDED:
case CHILD_UPDATED:
taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
final TaskAnnouncement announcement = jsonMapper.readValue(
event.getData().getData(), TaskAnnouncement.class
);

log.info(
"Worker[%s] wrote %s status for task [%s] on [%s]",
zkWorker.getWorker().getHost(),
announcement.getTaskStatus().getStatusCode(),
taskId,
announcement.getTaskLocation()
);

// Synchronizing state with ZK
statusLock.notifyAll();

final RemoteTaskRunnerWorkItem tmp;
if ((tmp = runningTasks.get(taskId)) != null) {
taskRunnerWorkItem = tmp;
} else {
final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
taskId,
announcement.getTaskType(),
zkWorker.getWorker(),
TaskLocation.unknown(),
announcement.getTaskDataSource()
);
final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
taskId,
newTaskRunnerWorkItem
);
if (existingItem == null) {
log.warn(
"Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
zkWorker.getWorker().getHost(),
taskId
);
taskRunnerWorkItem = newTaskRunnerWorkItem;
} else {
taskRunnerWorkItem = existingItem;
}
}

if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) {
taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
}
zkWorker.addListener(getStatusListener(worker, zkWorker, retVal));
zkWorker.start();
return retVal;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

if (announcement.getTaskStatus().isComplete()) {
taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus());
runPendingTasks();
}
break;
case CHILD_REMOVED:
taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
taskRunnerWorkItem = runningTasks.remove(taskId);
if (taskRunnerWorkItem != null) {
log.info("Task[%s] just disappeared!", taskId);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId));
} else {
log.info("Task[%s] went bye bye.", taskId);
}
break;
case INITIALIZED:
if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
retVal.set(zkWorker);
} else {
final String message = StringUtils.format(
"WTF?! Tried to add already-existing worker[%s]",
worker.getHost()
);
log.makeAlert(message)
.addData("workerHost", worker.getHost())
.addData("workerIp", worker.getIp())
.emit();
retVal.setException(new IllegalStateException(message));
}
runPendingTasks();
break;
case CONNECTION_SUSPENDED:
case CONNECTION_RECONNECTED:
case CONNECTION_LOST:
// do nothing
@VisibleForTesting
PathChildrenCacheListener getStatusListener(final Worker worker, final ZkWorker zkWorker, final SettableFuture<ZkWorker> retVal)
{
return (client, event) -> {
final String taskId;
final RemoteTaskRunnerWorkItem taskRunnerWorkItem;
synchronized (statusLock) {
try {
switch (event.getType()) {
case CHILD_ADDED:
case CHILD_UPDATED:
if (event.getData() == null) {
log.error("Unexpected null for event.getData() in handle new worker status for [%s]", event.getType().toString());
log.makeAlert("Unexpected null for event.getData() in handle new worker status")
.addData("worker", zkWorker.getWorker().getHost())
.addData("eventType", event.getType().toString())
.emit();
return;
}
taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
final TaskAnnouncement announcement = jsonMapper.readValue(
event.getData().getData(), TaskAnnouncement.class
);

log.info(
"Worker[%s] wrote %s status for task [%s] on [%s]",
zkWorker.getWorker().getHost(),
announcement.getTaskStatus().getStatusCode(),
taskId,
announcement.getTaskLocation()
);

// Synchronizing state with ZK
statusLock.notifyAll();

final RemoteTaskRunnerWorkItem tmp;
if ((tmp = runningTasks.get(taskId)) != null) {
taskRunnerWorkItem = tmp;
} else {
final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
taskId,
announcement.getTaskType(),
zkWorker.getWorker(),
TaskLocation.unknown(),
announcement.getTaskDataSource()
);
final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
taskId,
newTaskRunnerWorkItem
);
if (existingItem == null) {
log.warn(
"Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
zkWorker.getWorker().getHost(),
taskId
);
taskRunnerWorkItem = newTaskRunnerWorkItem;
} else {
taskRunnerWorkItem = existingItem;
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to handle new worker status")

if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) {
taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
}

if (announcement.getTaskStatus().isComplete()) {
taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus());
runPendingTasks();
}
break;
case CHILD_REMOVED:
if (event.getData() == null) {
log.error("Unexpected null for event.getData() in handle new worker status for [%s]", event.getType().toString());
log.makeAlert("Unexpected null for event.getData() in handle new worker status")
.addData("worker", zkWorker.getWorker().getHost())
.addData("znode", event.getData().getPath())
.addData("eventType", event.getType().toString())
.emit();
return;
}
}
taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
taskRunnerWorkItem = runningTasks.remove(taskId);
if (taskRunnerWorkItem != null) {
log.info("Task[%s] just disappeared!", taskId);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId));
} else {
log.info("Task[%s] went bye bye.", taskId);
}
break;
case INITIALIZED:
if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
retVal.set(zkWorker);
} else {
final String message = StringUtils.format(
"This should not happen...tried to add already-existing worker[%s]",
worker.getHost()
);
log.makeAlert(message)
.addData("workerHost", worker.getHost())
.addData("workerIp", worker.getIp())
.emit();
retVal.setException(new IllegalStateException(message));
}
runPendingTasks();
break;
case CONNECTION_SUSPENDED:
case CONNECTION_RECONNECTED:
case CONNECTION_LOST:
// do nothing
}
);
zkWorker.start();
return retVal;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
catch (Exception e) {
String znode = null;
if (event.getData() != null) {
znode = event.getData().getPath();
}
log.makeAlert(e, "Failed to handle new worker status")
.addData("worker", zkWorker.getWorker().getHost())
.addData("znode", znode)
.addData("eventType", event.getType().toString())
.emit();
}
}
};
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IndexingServiceCondition;
Expand All @@ -44,6 +45,7 @@
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.testing.DeadlockDetectingTimeout;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
Expand All @@ -55,6 +57,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -944,4 +947,37 @@ public void testSuccessfulTaskOnBlacklistedWorker() throws Exception
Assert.assertTrue(taskFuture2.get().isSuccess());
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
}

@Test
public void testStatusListenerEventDataNullShouldNotThrowException() throws Exception
{
// Set up mock emitter to verify log alert when exception is thrown inside the status listener
Worker worker = EasyMock.createMock(Worker.class);
EasyMock.expect(worker.getHost()).andReturn("host").atLeastOnce();
EasyMock.replay(worker);
ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
Capture<EmittingLogger.EmittingAlertBuilder> capturedArgument = Capture.newInstance();
emitter.emit(EasyMock.capture(capturedArgument));
EasyMock.expectLastCall().atLeastOnce();
EmittingLogger.registerEmitter(emitter);
EasyMock.replay(emitter);

PathChildrenCache cache = new PathChildrenCache(cf, "/test", true);
testStartWithNoWorker();
cache.getListenable().addListener(remoteTaskRunner.getStatusListener(worker, new ZkWorker(worker, cache, jsonMapper), null));
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

// Status listener will recieve event with null data
Assert.assertTrue(
TestUtils.conditionValid(() -> cache.getCurrentData().size() == 1)
);

// Verify that the log emitter was called
EasyMock.verify(worker);
EasyMock.verify(emitter);
Map<String, Object> alertDataMap = capturedArgument.getValue().build(null).getDataMap();
Assert.assertTrue(alertDataMap.containsKey("znode"));
Assert.assertNull(alertDataMap.get("znode"));
// Status listener should successfully completes without throwing exception
}
}
2 changes: 1 addition & 1 deletion licenses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1315,7 +1315,7 @@ name: Apache Curator
license_category: binary
module: java-core
license_name: Apache License version 2.0
version: 4.1.0
version: 4.3.0
libraries:
- org.apache.curator: curator-client
- org.apache.curator: curator-framework
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
<java.version>8</java.version>
<project.build.resourceEncoding>UTF-8</project.build.resourceEncoding>
<aether.version>0.9.0.M2</aether.version>
<apache.curator.version>4.1.0</apache.curator.version>
<apache.curator.version>4.3.0</apache.curator.version>
<apache.curator.test.version>2.12.0</apache.curator.test.version>
<apache.kafka.version>2.2.2</apache.kafka.version>
<avatica.version>1.15.0</avatica.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,18 @@ public ServiceProviderBuilder<T> additionalFilter(InstanceFilter<T> tInstanceFil
{
return this;
}

@Override
public ServiceProviderBuilder<T> executorService(ExecutorService executorService)
{
return this;
}

@Override
public ServiceProviderBuilder<T> executorService(CloseableExecutorService closeableExecutorService)
{
return this;
}
}

private static class NoopServiceProvider<T> implements ServiceProvider<T>
Expand Down