Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,29 @@

package io.druid.query.lookup;


import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.druid.guice.annotations.Json;
import io.druid.java.util.common.FileUtils;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;


public class LookupSnapshotTaker
{
private static final Logger LOGGER = new Logger(LookupSnapshotTaker.class);
protected static final String PERSIST_FILE_NAME = "lookupSnapshot.json";
private static final String PERSIST_FILE_SUFFIX = "lookupSnapshot.json";

private final ObjectMapper objectMapper;
private final File persistDirectory;
private final File persistFile;


public LookupSnapshotTaker(
final @Json ObjectMapper jsonMapper,
Expand All @@ -62,11 +60,12 @@ public LookupSnapshotTaker(
if (!this.persistDirectory.isDirectory()) {
throw new ISE("Can only persist to directories, [%s] wasn't a directory", persistDirectory);
}
this.persistFile = new File(persistDirectory, PERSIST_FILE_NAME);
}

public synchronized List<LookupBean> pullExistingSnapshot()
public synchronized List<LookupBean> pullExistingSnapshot(final String tier)
{
final File persistFile = getPersistFile(tier);

List<LookupBean> lookupBeanList;
try {
if (!persistFile.isFile()) {
Expand All @@ -84,8 +83,10 @@ public synchronized List<LookupBean> pullExistingSnapshot()
}
}

public synchronized void takeSnapshot(List<LookupBean> lookups)
public synchronized void takeSnapshot(String tier, List<LookupBean> lookups)
{
final File persistFile = getPersistFile(tier);

try {
FileUtils.writeAtomically(persistFile, out -> objectMapper.writeValue(out, lookups));
}
Expand All @@ -94,8 +95,9 @@ public synchronized void takeSnapshot(List<LookupBean> lookups)
}
}

public File getPersistFile()
@VisibleForTesting
File getPersistFile(final String tier)
{
return persistFile;
return new File(persistDirectory, StringUtils.format("%s.%s", tier, PERSIST_FILE_SUFFIX));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ private void updateToLoadAndDrop(
private void takeSnapshot(Map<String, LookupExtractorFactoryContainer> lookupMap)
{
if (lookupSnapshotTaker != null) {
lookupSnapshotTaker.takeSnapshot(getLookupBeanList(lookupMap));
lookupSnapshotTaker.takeSnapshot(lookupListeningAnnouncerConfig.getLookupTier(), getLookupBeanList(lookupMap));
}
}

Expand All @@ -362,8 +362,7 @@ private List<LookupBean> getLookupsList()
{
List<LookupBean> lookupBeanList;
if (lookupConfig.getEnableLookupSyncOnStartup()) {
String tier = lookupListeningAnnouncerConfig.getLookupTier();
lookupBeanList = getLookupListFromCoordinator(tier);
lookupBeanList = getLookupListFromCoordinator(lookupListeningAnnouncerConfig.getLookupTier());
if (lookupBeanList == null) {
LOG.info("Coordinator is unavailable. Loading saved snapshot instead");
lookupBeanList = getLookupListFromSnapshot();
Expand Down Expand Up @@ -455,7 +454,7 @@ private Map<String, LookupExtractorFactoryContainer> tryGetLookupListFromCoordin
private List<LookupBean> getLookupListFromSnapshot()
{
if (lookupSnapshotTaker != null) {
return lookupSnapshotTaker.pullExistingSnapshot();
return lookupSnapshotTaker.pullExistingSnapshot(lookupListeningAnnouncerConfig.getLookupTier());
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void testStartStop() throws InterruptedException, IOException
lookupMap.put("testMockForStartStop", container);
String strResult = mapper.writeValueAsString(lookupMap);
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
.andReturn(request);
Expand Down Expand Up @@ -163,7 +163,7 @@ public void testAddGetRemove() throws Exception
lookupMap.put("testMockForAddGetRemove", container);
String strResult = mapper.writeValueAsString(lookupMap);
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
.andReturn(request);
Expand Down Expand Up @@ -201,7 +201,7 @@ public void testCloseIsCalledAfterStopping() throws Exception
lookupMap.put("testMockForCloseIsCalledAfterStopping", container);
String strResult = mapper.writeValueAsString(lookupMap);
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
.andReturn(request);
Expand Down Expand Up @@ -232,7 +232,7 @@ public void testCloseIsCalledAfterRemove() throws Exception
lookupMap.put("testMockForCloseIsCalledAfterRemove", container);
String strResult = mapper.writeValueAsString(lookupMap);
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
.andReturn(request);
Expand Down Expand Up @@ -260,7 +260,7 @@ public void testGetNotThere() throws Exception
lookupMap.put("testMockForGetNotThere", container);
String strResult = mapper.writeValueAsString(lookupMap);
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
.andReturn(request);
Expand Down Expand Up @@ -290,7 +290,7 @@ public void testUpdateWithHigherVersion() throws Exception
lookupMap.put("testMockForUpdateWithHigherVersion", container);
String strResult = mapper.writeValueAsString(lookupMap);
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
.andReturn(request);
Expand Down Expand Up @@ -324,7 +324,7 @@ public void testUpdateWithLowerVersion() throws Exception
lookupMap.put("testMockForUpdateWithLowerVersion", container);
String strResult = mapper.writeValueAsString(lookupMap);
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
.andReturn(request);
Expand Down Expand Up @@ -352,7 +352,7 @@ public void testRemoveNonExisting() throws Exception
lookupMap.put("testMockForRemoveNonExisting", container);
String strResult = mapper.writeValueAsString(lookupMap);
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
.andReturn(request);
Expand Down Expand Up @@ -403,7 +403,7 @@ public void testGetAllLookupsState() throws Exception
Map<String, Object> lookupMap = new HashMap<>();
String strResult = mapper.writeValueAsString(lookupMap);
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
.andReturn(request);
Expand Down Expand Up @@ -445,7 +445,7 @@ public void testRealModeWithMainThread() throws Exception
lookupMap.put("testMockForRealModeWithMainThread", container);
String strResult = mapper.writeValueAsString(lookupMap);
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
.andReturn(request);
Expand Down Expand Up @@ -521,7 +521,7 @@ public void testCoordinatorLookupSync() throws Exception
lookupMap.put("testLookup3", container3);
String strResult = mapper.writeValueAsString(lookupMap);
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
.andReturn(request);
Expand All @@ -547,7 +547,7 @@ public void testLoadLookupOnCoordinatorFailure() throws Exception
lookupMap.put("testMockForLoadLookupOnCoordinatorFailure", container);
String strResult = mapper.writeValueAsString(lookupMap);
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
.andReturn(request)
Expand All @@ -565,13 +565,13 @@ public void testLoadLookupOnCoordinatorFailure() throws Exception
lookupReferencesManager.handlePendingNotices();
lookupReferencesManager.stop();
lookupReferencesManager = new LookupReferencesManager(
new LookupConfig(lookupReferencesManager.lookupSnapshotTaker.getPersistFile().getParent()),
new LookupConfig(lookupReferencesManager.lookupSnapshotTaker.getPersistFile(LOOKUP_TIER).getParent()),
mapper, druidLeaderClient, config,
true
);
reset(config);
reset(druidLeaderClient);
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"))
.andReturn(request)
Expand All @@ -593,7 +593,7 @@ public void testDisableLookupSync() throws Exception
lookupMap.put("testMockForDisableLookupSync", container);
String strResult = mapper.writeValueAsString(lookupMap);
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
replay(config);
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true"))
.andReturn(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import io.druid.segment.TestHelper;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -42,6 +41,9 @@

public class LookupSnapshotTakerTest
{
private static final String TIER1 = "tier1";
private static final String TIER2 = "tier2";

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

Expand All @@ -65,68 +67,40 @@ public void setUp() throws IOException
@Test
public void testTakeSnapshotAndPullExisting() throws IOException
{

LookupBean lookupBean = new LookupBean(
"name",
LookupBean lookupBean1 = new LookupBean(
"name1",
null,
new LookupExtractorFactoryContainer(
"v1",
new MapLookupExtractorFactory(
ImmutableMap.of(
"key",
"value"
), true
)
new MapLookupExtractorFactory(ImmutableMap.of("key", "value"), true)
)
);
List<LookupBean> lookupBeanList = Lists.newArrayList(lookupBean);
lookupSnapshotTaker.takeSnapshot(lookupBeanList);
List<LookupBean> actualList = lookupSnapshotTaker.pullExistingSnapshot();
Assert.assertEquals(lookupBeanList, actualList);
}

//test backward compatibility with snapshots stored using 0.9.x code
@Test
public void testBackwardCompatibility() throws IOException
{
File directory = temporaryFolder.newFolder();
File snapshotFile = new File(directory, LookupSnapshotTaker.PERSIST_FILE_NAME);
Assert.assertFalse(snapshotFile.exists());
FileUtils.write(
snapshotFile,
"[{\"factory\":{\"type\":\"map\",\"map\":{\"key\":\"value\"},\"isOneToOne\":true},\"name\":\"name\"}]"
);
Assert.assertTrue(snapshotFile.exists());
List<LookupBean> actualList = new LookupSnapshotTaker(mapper, directory.getAbsolutePath()).pullExistingSnapshot();

LookupBean lookupBean = new LookupBean(
"name",
LookupBean lookupBean2 = new LookupBean(
"name2",
null,
new LookupExtractorFactoryContainer(
null,
new MapLookupExtractorFactory(
ImmutableMap.of(
"key",
"value"
), true
)
"v1",
new MapLookupExtractorFactory(ImmutableMap.of("key", "value"), true)
)
);
List<LookupBean> lookupBeanList = Lists.newArrayList(lookupBean);

Assert.assertEquals(lookupBeanList, actualList);
List<LookupBean> lookupBeanList1 = Lists.newArrayList(lookupBean1);
lookupSnapshotTaker.takeSnapshot(TIER1, lookupBeanList1);
List<LookupBean> lookupBeanList2 = Lists.newArrayList(lookupBean2);
lookupSnapshotTaker.takeSnapshot(TIER2, lookupBeanList2);
Assert.assertEquals(lookupBeanList1, lookupSnapshotTaker.pullExistingSnapshot(TIER1));
Assert.assertEquals(lookupBeanList2, lookupSnapshotTaker.pullExistingSnapshot(TIER2));
}

@Test
public void testIOExceptionDuringLookupPersist() throws IOException
{
File directory = temporaryFolder.newFolder();
File snapshotFile = new File(directory, LookupSnapshotTaker.PERSIST_FILE_NAME);
LookupSnapshotTaker lookupSnapshotTaker = new LookupSnapshotTaker(mapper, directory.getAbsolutePath());
File snapshotFile = lookupSnapshotTaker.getPersistFile(TIER1);
Assert.assertFalse(snapshotFile.exists());
Assert.assertTrue(snapshotFile.createNewFile());
Assert.assertTrue(snapshotFile.setReadOnly());
Assert.assertTrue(snapshotFile.getParentFile().setReadOnly());
LookupSnapshotTaker lookupSnapshotTaker = new LookupSnapshotTaker(mapper, directory.getAbsolutePath());
LookupBean lookupBean = new LookupBean(
"name",
null,
Expand All @@ -144,33 +118,33 @@ public void testIOExceptionDuringLookupPersist() throws IOException

expectedException.expect(ISE.class);
expectedException.expectMessage("Exception during serialization of lookups");
lookupSnapshotTaker.takeSnapshot(lookupBeanList);
lookupSnapshotTaker.takeSnapshot(TIER1, lookupBeanList);
}

@Test
public void tesLookupPullingFromEmptyFile() throws IOException
{
File snapshotFile = lookupSnapshotTaker.getPersistFile();
File snapshotFile = lookupSnapshotTaker.getPersistFile(TIER1);
Assert.assertTrue(snapshotFile.createNewFile());
Assert.assertEquals(Collections.EMPTY_LIST, lookupSnapshotTaker.pullExistingSnapshot());
Assert.assertEquals(Collections.EMPTY_LIST, lookupSnapshotTaker.pullExistingSnapshot(TIER1));
}

@Test(expected = ISE.class)
public void tesLookupPullingFromCorruptFile() throws IOException
{
File snapshotFile = lookupSnapshotTaker.getPersistFile();
File snapshotFile = lookupSnapshotTaker.getPersistFile(TIER1);
Assert.assertTrue(snapshotFile.createNewFile());
byte[] bytes = StringUtils.toUtf8("test corrupt file");
Files.write(bytes, snapshotFile);
lookupSnapshotTaker.pullExistingSnapshot();
lookupSnapshotTaker.pullExistingSnapshot(TIER1);
}

@Test
public void testLookupPullingFromNonExistingFile() throws IOException
{
File directory = temporaryFolder.newFolder();
LookupSnapshotTaker lookupSnapshotTaker = new LookupSnapshotTaker(mapper, directory.getAbsolutePath());
List<LookupBean> actualList = lookupSnapshotTaker.pullExistingSnapshot();
List<LookupBean> actualList = lookupSnapshotTaker.pullExistingSnapshot(TIER1);
Assert.assertEquals(Collections.EMPTY_LIST, actualList);
}
}