Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec)
Preconditions.checkState(started, "SupervisorManager not started");
Preconditions.checkNotNull(spec, "spec");
Preconditions.checkNotNull(spec.getId(), "spec.getId()");
Preconditions.checkNotNull(spec.getDataSources(), "spec.getDatasources()");

synchronized (lock) {
Preconditions.checkState(started, "SupervisorManager not started");
Expand Down Expand Up @@ -197,7 +198,7 @@ private boolean possiblyStopAndRemoveSupervisorInternal(String id, boolean write
}

if (writeTombstone) {
metadataSupervisorManager.insert(id, new NoopSupervisorSpec()); // where NoopSupervisorSpec is a tombstone
metadataSupervisorManager.insert(id, new NoopSupervisorSpec(null, pair.rhs.getDataSources())); // where NoopSupervisorSpec is a tombstone
}
pair.lhs.stop(true);
supervisors.remove(id);
Expand Down Expand Up @@ -232,7 +233,7 @@ private boolean createAndStartSupervisorInternal(SupervisorSpec spec, boolean pe
catch (Exception e) {
// Supervisor creation or start failed write tombstone only when trying to start a new supervisor
if (persistSpec) {
metadataSupervisorManager.insert(id, new NoopSupervisorSpec());
metadataSupervisorManager.insert(id, new NoopSupervisorSpec(null, spec.getDataSources()));
}
throw Throwables.propagate(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -65,7 +66,7 @@ public class SupervisorResource
return null;
}
if (supervisorSpec.getSpec().getDataSources() == null) {
return null;
return new ArrayList<>();
}
return Iterables.transform(
supervisorSpec.getSpec().getDataSources(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -288,7 +289,7 @@ public Supervisor createSupervisor()
@Override
public List<String> getDataSources()
{
return null;
return new ArrayList<>();
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package io.druid.indexing.overlord.supervisor;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -287,6 +288,10 @@ public void testSpecGetAllHistory()
new VersionedSupervisorSpec(
new TestSupervisorSpec("id1", null, Arrays.asList("datasource1")),
"v2"
),
new VersionedSupervisorSpec(
new NoopSupervisorSpec(null, Arrays.asList("datasource1")),
"tombstone"
)
);
List<VersionedSupervisorSpec> versions2 = ImmutableList.of(
Expand All @@ -297,11 +302,42 @@ public void testSpecGetAllHistory()
new VersionedSupervisorSpec(
new TestSupervisorSpec("id2", null, Arrays.asList("datasource2")),
"v2"
),
new VersionedSupervisorSpec(
new NoopSupervisorSpec(null, Arrays.asList("datasource2")),
"tombstone"
),
new VersionedSupervisorSpec(
new TestSupervisorSpec("id2", null, Arrays.asList("datasource2")),
"v3"
)
);
List<VersionedSupervisorSpec> versions3 = ImmutableList.of(
new VersionedSupervisorSpec(
new TestSupervisorSpec("id2", null, Arrays.asList("datasource3")),
"v1"
),
new VersionedSupervisorSpec(
new NoopSupervisorSpec(null, null),
"tombstone"
),
new VersionedSupervisorSpec(
new TestSupervisorSpec("id2", null, Arrays.asList("datasource3")),
"v2"
),
new VersionedSupervisorSpec(
new NoopSupervisorSpec(null, null),
"tombstone"
),
new VersionedSupervisorSpec(
new TestSupervisorSpec("id2", null, Arrays.asList("datasource3")),
"v3"
)
);
Map<String, List<VersionedSupervisorSpec>> history = Maps.newHashMap();
history.put("id1", versions1);
history.put("id2", versions2);
history.put("id3", versions3);

EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(2);
EasyMock.expect(supervisorManager.getSupervisorHistory()).andReturn(history);
Expand Down Expand Up @@ -344,6 +380,10 @@ public void testSpecGetAllHistoryWithAuthFailureFiltering()
new VersionedSupervisorSpec(
new TestSupervisorSpec("id1", null, Arrays.asList("datasource1")),
"v2"
),
new VersionedSupervisorSpec(
new NoopSupervisorSpec(null, Arrays.asList("datasource1")),
"tombstone"
)
);
List<VersionedSupervisorSpec> versions2 = ImmutableList.of(
Expand All @@ -354,12 +394,62 @@ public void testSpecGetAllHistoryWithAuthFailureFiltering()
new VersionedSupervisorSpec(
new TestSupervisorSpec("id2", null, Arrays.asList("datasource2")),
"v2"
),
new VersionedSupervisorSpec(
new NoopSupervisorSpec(null, Arrays.asList("datasource2")),
"tombstone"
)
);
List<VersionedSupervisorSpec> versions3 = ImmutableList.of(
new VersionedSupervisorSpec(
new TestSupervisorSpec("id2", null, Arrays.asList("datasource2")),
"v1"
),
new VersionedSupervisorSpec(
new TestSupervisorSpec("id2", null, Arrays.asList("datasource2")),
"v2"
),
new VersionedSupervisorSpec(
new NoopSupervisorSpec(null, Arrays.asList("datasource2")),
"tombstone"
),
new VersionedSupervisorSpec(
new TestSupervisorSpec("id3", null, Arrays.asList("datasource3")),
"v1"
),
new VersionedSupervisorSpec(
new NoopSupervisorSpec(null, Arrays.asList("datasource3")),
"tombstone"
)
);
List<VersionedSupervisorSpec> versions4 = ImmutableList.of(
new VersionedSupervisorSpec(
new TestSupervisorSpec("id2", null, Arrays.asList("datasource2")),
"v1"
),
new VersionedSupervisorSpec(
new NoopSupervisorSpec(null, null),
"tombstone"
),
new VersionedSupervisorSpec(
new TestSupervisorSpec("id2", null, Arrays.asList("datasource2")),
"v2"
),
new VersionedSupervisorSpec(
new NoopSupervisorSpec(null, null),
"tombstone"
),
new VersionedSupervisorSpec(
new TestSupervisorSpec("id2", null, Arrays.asList("datasource2")),
"v3"
)
);

Map<String, List<VersionedSupervisorSpec>> history = Maps.newHashMap();
history.put("id1", versions1);
history.put("id2", versions2);
history.put("id3", versions3);
history.put("id4", versions4);

EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(2);
EasyMock.expect(supervisorManager.getSupervisorHistory()).andReturn(history);
Expand All @@ -379,6 +469,32 @@ public void testSpecGetAllHistoryWithAuthFailureFiltering()

Map<String, List<VersionedSupervisorSpec>> filteredHistory = Maps.newHashMap();
filteredHistory.put("id1", versions1);
filteredHistory.put(
"id3",
ImmutableList.of(
new VersionedSupervisorSpec(
new TestSupervisorSpec("id3", null, Arrays.asList("datasource3")),
"v1"
),
new VersionedSupervisorSpec(
new NoopSupervisorSpec(null, Arrays.asList("datasource3")),
"tombstone"
)
)
);
filteredHistory.put(
"id4",
ImmutableList.of(
new VersionedSupervisorSpec(
new NoopSupervisorSpec(null, null),
"tombstone"
),
new VersionedSupervisorSpec(
new NoopSupervisorSpec(null, null),
"tombstone"
)
)
);

Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(filteredHistory, response.getEntity());
Expand All @@ -402,6 +518,10 @@ public void testSpecGetHistory()
new TestSupervisorSpec("id1", null, Arrays.asList("datasource1")),
"v1"
),
new VersionedSupervisorSpec(
new NoopSupervisorSpec(null, Arrays.asList("datasource1")),
"tombstone"
),
new VersionedSupervisorSpec(
new TestSupervisorSpec("id1", null, Arrays.asList("datasource1")),
"v2"
Expand All @@ -412,6 +532,10 @@ public void testSpecGetHistory()
new TestSupervisorSpec("id2", null, Arrays.asList("datasource2")),
"v1"
),
new VersionedSupervisorSpec(
new NoopSupervisorSpec(null, Arrays.asList("datasource2")),
"tombstone"
),
new VersionedSupervisorSpec(
new TestSupervisorSpec("id2", null, Arrays.asList("datasource2")),
"v2"
Expand Down Expand Up @@ -464,6 +588,10 @@ public void testSpecGetHistoryWithAuthFailure() throws Exception
new TestSupervisorSpec("id1", null, Arrays.asList("datasource1")),
"v1"
),
new VersionedSupervisorSpec(
new NoopSupervisorSpec(null, Arrays.asList("datasource3")),
"tombstone"
),
new VersionedSupervisorSpec(
new TestSupervisorSpec("id1", null, Arrays.asList("datasource1")),
"v2"
Expand All @@ -474,6 +602,10 @@ public void testSpecGetHistoryWithAuthFailure() throws Exception
new TestSupervisorSpec("id2", null, Arrays.asList("datasource2")),
"v1"
),
new VersionedSupervisorSpec(
new NoopSupervisorSpec(null, Arrays.asList("datasource2")),
"tombstone"
),
new VersionedSupervisorSpec(
new TestSupervisorSpec("id2", null, Arrays.asList("datasource2")),
"v2"
Expand All @@ -484,9 +616,25 @@ public void testSpecGetHistoryWithAuthFailure() throws Exception
new TestSupervisorSpec("id3", null, Arrays.asList("datasource3")),
"v1"
),
new VersionedSupervisorSpec(
new NoopSupervisorSpec(null, null),
"tombstone"
),
new VersionedSupervisorSpec(
new TestSupervisorSpec("id3", null, Arrays.asList("datasource2")),
"v2"
),
new VersionedSupervisorSpec(
new NoopSupervisorSpec(null, null),
"tombstone"
),
new VersionedSupervisorSpec(
new TestSupervisorSpec("id3", null, Arrays.asList("datasource3")),
"v2"
),
new VersionedSupervisorSpec(
new NoopSupervisorSpec(null, Arrays.asList("datasource3")),
"tombstone"
)
);
Map<String, List<VersionedSupervisorSpec>> history = Maps.newHashMap();
Expand Down Expand Up @@ -521,6 +669,22 @@ public void testSpecGetHistoryWithAuthFailure() throws Exception
new VersionedSupervisorSpec(
new TestSupervisorSpec("id3", null, Arrays.asList("datasource3")),
"v1"
),
new VersionedSupervisorSpec(
new NoopSupervisorSpec(null, null),
"tombstone"
),
new VersionedSupervisorSpec(
new NoopSupervisorSpec(null, null),
"tombstone"
),
new VersionedSupervisorSpec(
new TestSupervisorSpec("id3", null, Arrays.asList("datasource3")),
"v2"
),
new VersionedSupervisorSpec(
new NoopSupervisorSpec(null, Arrays.asList("datasource3")),
"tombstone"
)
),
response.getEntity()
Expand Down Expand Up @@ -574,6 +738,23 @@ public void testReset()
verifyAll();
}

@Test
public void testNoopSupervisorSpecSerde() throws Exception
{
ObjectMapper mapper = new ObjectMapper();
String oldSpec = "{\"type\":\"NoopSupervisorSpec\",\"id\":null,\"dataSources\":null}";
NoopSupervisorSpec expectedSpec = new NoopSupervisorSpec(null, null);
NoopSupervisorSpec deserializedSpec = mapper.readValue(oldSpec, NoopSupervisorSpec.class);
Assert.assertEquals(expectedSpec, deserializedSpec);

NoopSupervisorSpec spec1 = new NoopSupervisorSpec("abcd", Lists.newArrayList("defg"));
NoopSupervisorSpec spec2 = mapper.readValue(
mapper.writeValueAsBytes(spec1),
NoopSupervisorSpec.class
);
Assert.assertEquals(spec1, spec2);
}

private static class TestSupervisorSpec implements SupervisorSpec
{
private final String id;
Expand Down
Loading