Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -297,18 +297,16 @@ public ListenableFuture<TaskStatus> run(final Task task)
if (runningTask != null) {
ZkWorker zkWorker = findWorkerRunningTask(task.getId());
if (zkWorker == null) {
log.makeAlert("Told to run task that is in the running queue but no worker is actually running it?!")
.addData("taskId", task.getId())
.emit();
runningTasks.remove(task.getId());
log.warn("Told to run task[%s], but no worker has started running it yet.", task.getId());
} else {
log.info("Task[%s] already running on %s.", task.getId(), zkWorker.getWorker().getHost());
TaskAnnouncement announcement = zkWorker.getRunningTasks().get(task.getId());
if (announcement.getTaskStatus().isComplete()) {
taskComplete(runningTask, zkWorker, task.getId(), announcement.getTaskStatus());
}
return runningTask.getResult();
}

return runningTask.getResult();
}

RemoteTaskRunnerWorkItem pendingTask = pendingTasks.get(task.getId());
Expand Down Expand Up @@ -552,12 +550,15 @@ private void announceTask(Worker theWorker, RemoteTaskRunnerWorkItem taskRunnerW
timeoutStopwatch.start();
synchronized (statusLock) {
while (!isWorkerRunningTask(theWorker, task)) {
statusLock.wait(config.getTaskAssignmentTimeout().getMillis());
if (timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS) >= config.getTaskAssignmentTimeout().getMillis()) {
final long waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis();
statusLock.wait(waitMs);
long elapsed = timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS);
if (elapsed >= waitMs) {
log.error(
"Something went wrong! %s never ran task %s after %s!",
"Something went wrong! [%s] never ran task [%s]! Timeout: (%s >= %s)!",
theWorker.getHost(),
task.getId(),
elapsed,
config.getTaskAssignmentTimeout()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void unannounceTask(String taskId)
}
}

public void announceTask(TaskAnnouncement announcement)
public void announceTastAnnouncement(TaskAnnouncement announcement)
{
synchronized (lock) {
if (!started) {
Expand Down Expand Up @@ -219,7 +219,7 @@ public void updateAnnouncement(TaskAnnouncement announcement)

try {
if (curatorFramework.checkExists().forPath(getStatusPathForId(announcement.getTaskStatus().getId())) == null) {
announceTask(announcement);
announceTastAnnouncement(announcement);
return;
}
byte[] rawBytes = jsonMapper.writeValueAsBytes(announcement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
* The monitor watches ZK at a specified path for new tasks to appear. Upon starting the monitor, a listener will be
* created that waits for new tasks. Tasks are executed as soon as they are seen.
* <p/>
* The monitor implements {@link QuerySegmentWalker} so tasks can offer up queryable data. This is useful for
* The monitor implements {@link io.druid.query.QuerySegmentWalker} so tasks can offer up queryable data. This is useful for
* realtime index tasks.
*/
public class WorkerTaskMonitor
Expand Down Expand Up @@ -122,7 +122,7 @@ public void run()
TaskStatus taskStatus;
try {
workerCuratorCoordinator.unannounceTask(task.getId());
workerCuratorCoordinator.announceTask(
workerCuratorCoordinator.announceTastAnnouncement(
TaskAnnouncement.create(
task,
TaskStatus.running(task.getId())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package io.druid.indexing.common;

/**
*/
public interface IndexingServiceCondition
{
public boolean isValid();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,59 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package io.druid.indexing;
package io.druid.indexing.common;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import com.google.common.collect.Lists;
import io.druid.indexing.common.task.MergeTask;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import java.util.List;

/**
*/
@JsonTypeName("test")
public class TestTask extends MergeTask
public class TestMergeTask extends MergeTask
{
private final TaskStatus status;
public static TestMergeTask createDummyTask(String taskId)
{
return new TestMergeTask(
taskId,
"dummyDs",
Lists.<DataSegment>newArrayList(
new DataSegment(
"dummyDs",
new Interval(new DateTime(), new DateTime()),
new DateTime().toString(),
null,
null,
null,
null,
0,
0
)
),
Lists.<AggregatorFactory>newArrayList()
);
}

private final String id;

@JsonCreator
public TestTask(
public TestMergeTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("aggregations") List<AggregatorFactory> aggregators,
@JsonProperty("taskStatus") TaskStatus status
@JsonProperty("aggregations") List<AggregatorFactory> aggregators
)
{
super(id, dataSource, segments, aggregators);
this.status = status;
this.id = id;
}

@Override
Expand All @@ -57,15 +79,9 @@ public String getType()
return "test";
}

@JsonProperty
public TaskStatus getStatus()
{
return status;
}

@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
return status;
return TaskStatus.running(id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package io.druid.indexing.coordinator;
package io.druid.indexing.common;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.task.RealtimeIndexTask;
import io.druid.indexing.common.task.TaskResource;
import io.druid.query.aggregation.AggregatorFactory;
Expand All @@ -34,7 +32,7 @@
/**
*/
@JsonTypeName("test_realtime")
public class TestRealtimeTask extends RealtimeIndexTask
public class TestRealtimeTask extends RealtimeIndexTask implements TestTask
{
private final TaskStatus status;

Expand Down Expand Up @@ -66,6 +64,7 @@ public String getType()
return "test_realtime";
}

@Override
@JsonProperty
public TaskStatus getStatus()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package io.druid.indexing.common;

import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;

/**
*/
public interface TestTask extends Task
{
public TaskStatus getStatus();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package io.druid.indexing.common;

import com.google.common.base.Stopwatch;
import com.metamx.common.ISE;

import java.util.concurrent.TimeUnit;

/**
*/
public class TestUtils
{
public static boolean conditionValid(IndexingServiceCondition condition)
{
try {
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
while (!condition.isValid()) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Cannot find running task");
}
}
}
catch (Exception e) {
return false;
}
return true;
}
}
Loading