From 8e8da944862cf5ef8c41eac049ea5f9215a62c93 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 7 Feb 2018 11:28:53 -0800 Subject: [PATCH] Use a separate snapshot file per lookup tier. (#5358) Prevents conflicts if two processes on the same machine use the same lookup snapshot directory but are in different tiers. --- .../query/lookup/LookupSnapshotTaker.java | 22 +++--- .../query/lookup/LookupReferencesManager.java | 7 +- .../lookup/LookupReferencesManagerTest.java | 30 ++++---- .../query/lookup/LookupSnapshotTakerTest.java | 74 ++++++------------- 4 files changed, 54 insertions(+), 79 deletions(-) diff --git a/processing/src/main/java/io/druid/query/lookup/LookupSnapshotTaker.java b/processing/src/main/java/io/druid/query/lookup/LookupSnapshotTaker.java index 9162bf4b2035..52b69c03270c 100644 --- a/processing/src/main/java/io/druid/query/lookup/LookupSnapshotTaker.java +++ b/processing/src/main/java/io/druid/query/lookup/LookupSnapshotTaker.java @@ -19,14 +19,15 @@ package io.druid.query.lookup; - import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import io.druid.guice.annotations.Json; import io.druid.java.util.common.FileUtils; import io.druid.java.util.common.ISE; +import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.logger.Logger; import java.io.File; @@ -34,16 +35,13 @@ import java.util.Collections; import java.util.List; - public class LookupSnapshotTaker { private static final Logger LOGGER = new Logger(LookupSnapshotTaker.class); - protected static final String PERSIST_FILE_NAME = "lookupSnapshot.json"; + private static final String PERSIST_FILE_SUFFIX = "lookupSnapshot.json"; private final ObjectMapper objectMapper; private final File persistDirectory; - private final File persistFile; - public LookupSnapshotTaker( final @Json ObjectMapper jsonMapper, @@ -62,11 +60,12 @@ public LookupSnapshotTaker( if (!this.persistDirectory.isDirectory()) { throw new ISE("Can only persist to directories, [%s] wasn't a directory", persistDirectory); } - this.persistFile = new File(persistDirectory, PERSIST_FILE_NAME); } - public synchronized List pullExistingSnapshot() + public synchronized List pullExistingSnapshot(final String tier) { + final File persistFile = getPersistFile(tier); + List lookupBeanList; try { if (!persistFile.isFile()) { @@ -84,8 +83,10 @@ public synchronized List pullExistingSnapshot() } } - public synchronized void takeSnapshot(List lookups) + public synchronized void takeSnapshot(String tier, List lookups) { + final File persistFile = getPersistFile(tier); + try { FileUtils.writeAtomically(persistFile, out -> objectMapper.writeValue(out, lookups)); } @@ -94,8 +95,9 @@ public synchronized void takeSnapshot(List lookups) } } - public File getPersistFile() + @VisibleForTesting + File getPersistFile(final String tier) { - return persistFile; + return new File(persistDirectory, StringUtils.format("%s.%s", tier, PERSIST_FILE_SUFFIX)); } } diff --git a/server/src/main/java/io/druid/query/lookup/LookupReferencesManager.java b/server/src/main/java/io/druid/query/lookup/LookupReferencesManager.java index df2229f9229b..c9ab30c35bae 100644 --- a/server/src/main/java/io/druid/query/lookup/LookupReferencesManager.java +++ b/server/src/main/java/io/druid/query/lookup/LookupReferencesManager.java @@ -339,7 +339,7 @@ private void updateToLoadAndDrop( private void takeSnapshot(Map lookupMap) { if (lookupSnapshotTaker != null) { - lookupSnapshotTaker.takeSnapshot(getLookupBeanList(lookupMap)); + lookupSnapshotTaker.takeSnapshot(lookupListeningAnnouncerConfig.getLookupTier(), getLookupBeanList(lookupMap)); } } @@ -362,8 +362,7 @@ private List getLookupsList() { List lookupBeanList; if (lookupConfig.getEnableLookupSyncOnStartup()) { - String tier = lookupListeningAnnouncerConfig.getLookupTier(); - lookupBeanList = getLookupListFromCoordinator(tier); + lookupBeanList = getLookupListFromCoordinator(lookupListeningAnnouncerConfig.getLookupTier()); if (lookupBeanList == null) { LOG.info("Coordinator is unavailable. Loading saved snapshot instead"); lookupBeanList = getLookupListFromSnapshot(); @@ -455,7 +454,7 @@ private Map tryGetLookupListFromCoordin private List getLookupListFromSnapshot() { if (lookupSnapshotTaker != null) { - return lookupSnapshotTaker.pullExistingSnapshot(); + return lookupSnapshotTaker.pullExistingSnapshot(lookupListeningAnnouncerConfig.getLookupTier()); } return null; } diff --git a/server/src/test/java/io/druid/query/lookup/LookupReferencesManagerTest.java b/server/src/test/java/io/druid/query/lookup/LookupReferencesManagerTest.java index 62860d4d4d4f..8cc51b91f0bf 100644 --- a/server/src/test/java/io/druid/query/lookup/LookupReferencesManagerTest.java +++ b/server/src/test/java/io/druid/query/lookup/LookupReferencesManagerTest.java @@ -102,7 +102,7 @@ public void testStartStop() throws InterruptedException, IOException lookupMap.put("testMockForStartStop", container); String strResult = mapper.writeValueAsString(lookupMap); Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); - expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes(); replay(config); expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true")) .andReturn(request); @@ -163,7 +163,7 @@ public void testAddGetRemove() throws Exception lookupMap.put("testMockForAddGetRemove", container); String strResult = mapper.writeValueAsString(lookupMap); Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); - expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes(); replay(config); expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true")) .andReturn(request); @@ -201,7 +201,7 @@ public void testCloseIsCalledAfterStopping() throws Exception lookupMap.put("testMockForCloseIsCalledAfterStopping", container); String strResult = mapper.writeValueAsString(lookupMap); Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); - expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes(); replay(config); expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true")) .andReturn(request); @@ -232,7 +232,7 @@ public void testCloseIsCalledAfterRemove() throws Exception lookupMap.put("testMockForCloseIsCalledAfterRemove", container); String strResult = mapper.writeValueAsString(lookupMap); Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); - expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes(); replay(config); expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true")) .andReturn(request); @@ -260,7 +260,7 @@ public void testGetNotThere() throws Exception lookupMap.put("testMockForGetNotThere", container); String strResult = mapper.writeValueAsString(lookupMap); Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); - expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes(); replay(config); expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true")) .andReturn(request); @@ -290,7 +290,7 @@ public void testUpdateWithHigherVersion() throws Exception lookupMap.put("testMockForUpdateWithHigherVersion", container); String strResult = mapper.writeValueAsString(lookupMap); Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); - expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes(); replay(config); expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true")) .andReturn(request); @@ -324,7 +324,7 @@ public void testUpdateWithLowerVersion() throws Exception lookupMap.put("testMockForUpdateWithLowerVersion", container); String strResult = mapper.writeValueAsString(lookupMap); Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); - expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes(); replay(config); expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true")) .andReturn(request); @@ -352,7 +352,7 @@ public void testRemoveNonExisting() throws Exception lookupMap.put("testMockForRemoveNonExisting", container); String strResult = mapper.writeValueAsString(lookupMap); Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); - expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes(); replay(config); expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true")) .andReturn(request); @@ -403,7 +403,7 @@ public void testGetAllLookupsState() throws Exception Map lookupMap = new HashMap<>(); String strResult = mapper.writeValueAsString(lookupMap); Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); - expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes(); replay(config); expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true")) .andReturn(request); @@ -445,7 +445,7 @@ public void testRealModeWithMainThread() throws Exception lookupMap.put("testMockForRealModeWithMainThread", container); String strResult = mapper.writeValueAsString(lookupMap); Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); - expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes(); replay(config); expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true")) .andReturn(request); @@ -521,7 +521,7 @@ public void testCoordinatorLookupSync() throws Exception lookupMap.put("testLookup3", container3); String strResult = mapper.writeValueAsString(lookupMap); Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); - expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes(); replay(config); expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true")) .andReturn(request); @@ -547,7 +547,7 @@ public void testLoadLookupOnCoordinatorFailure() throws Exception lookupMap.put("testMockForLoadLookupOnCoordinatorFailure", container); String strResult = mapper.writeValueAsString(lookupMap); Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); - expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes(); replay(config); expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true")) .andReturn(request) @@ -565,13 +565,13 @@ public void testLoadLookupOnCoordinatorFailure() throws Exception lookupReferencesManager.handlePendingNotices(); lookupReferencesManager.stop(); lookupReferencesManager = new LookupReferencesManager( - new LookupConfig(lookupReferencesManager.lookupSnapshotTaker.getPersistFile().getParent()), + new LookupConfig(lookupReferencesManager.lookupSnapshotTaker.getPersistFile(LOOKUP_TIER).getParent()), mapper, druidLeaderClient, config, true ); reset(config); reset(druidLeaderClient); - expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes(); replay(config); expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true")) .andReturn(request) @@ -593,7 +593,7 @@ public void testDisableLookupSync() throws Exception lookupMap.put("testMockForDisableLookupSync", container); String strResult = mapper.writeValueAsString(lookupMap); Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); - expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes(); replay(config); expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true")) .andReturn(request); diff --git a/server/src/test/java/io/druid/query/lookup/LookupSnapshotTakerTest.java b/server/src/test/java/io/druid/query/lookup/LookupSnapshotTakerTest.java index ac63bbdf01e3..fed17880abba 100644 --- a/server/src/test/java/io/druid/query/lookup/LookupSnapshotTakerTest.java +++ b/server/src/test/java/io/druid/query/lookup/LookupSnapshotTakerTest.java @@ -26,7 +26,6 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.StringUtils; import io.druid.segment.TestHelper; -import org.apache.commons.io.FileUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -42,6 +41,9 @@ public class LookupSnapshotTakerTest { + private static final String TIER1 = "tier1"; + private static final String TIER2 = "tier2"; + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -65,68 +67,40 @@ public void setUp() throws IOException @Test public void testTakeSnapshotAndPullExisting() throws IOException { - - LookupBean lookupBean = new LookupBean( - "name", + LookupBean lookupBean1 = new LookupBean( + "name1", null, new LookupExtractorFactoryContainer( "v1", - new MapLookupExtractorFactory( - ImmutableMap.of( - "key", - "value" - ), true - ) + new MapLookupExtractorFactory(ImmutableMap.of("key", "value"), true) ) ); - List lookupBeanList = Lists.newArrayList(lookupBean); - lookupSnapshotTaker.takeSnapshot(lookupBeanList); - List actualList = lookupSnapshotTaker.pullExistingSnapshot(); - Assert.assertEquals(lookupBeanList, actualList); - } - - //test backward compatibility with snapshots stored using 0.9.x code - @Test - public void testBackwardCompatibility() throws IOException - { - File directory = temporaryFolder.newFolder(); - File snapshotFile = new File(directory, LookupSnapshotTaker.PERSIST_FILE_NAME); - Assert.assertFalse(snapshotFile.exists()); - FileUtils.write( - snapshotFile, - "[{\"factory\":{\"type\":\"map\",\"map\":{\"key\":\"value\"},\"isOneToOne\":true},\"name\":\"name\"}]" - ); - Assert.assertTrue(snapshotFile.exists()); - List actualList = new LookupSnapshotTaker(mapper, directory.getAbsolutePath()).pullExistingSnapshot(); - - LookupBean lookupBean = new LookupBean( - "name", + LookupBean lookupBean2 = new LookupBean( + "name2", null, new LookupExtractorFactoryContainer( - null, - new MapLookupExtractorFactory( - ImmutableMap.of( - "key", - "value" - ), true - ) + "v1", + new MapLookupExtractorFactory(ImmutableMap.of("key", "value"), true) ) ); - List lookupBeanList = Lists.newArrayList(lookupBean); - - Assert.assertEquals(lookupBeanList, actualList); + List lookupBeanList1 = Lists.newArrayList(lookupBean1); + lookupSnapshotTaker.takeSnapshot(TIER1, lookupBeanList1); + List lookupBeanList2 = Lists.newArrayList(lookupBean2); + lookupSnapshotTaker.takeSnapshot(TIER2, lookupBeanList2); + Assert.assertEquals(lookupBeanList1, lookupSnapshotTaker.pullExistingSnapshot(TIER1)); + Assert.assertEquals(lookupBeanList2, lookupSnapshotTaker.pullExistingSnapshot(TIER2)); } @Test public void testIOExceptionDuringLookupPersist() throws IOException { File directory = temporaryFolder.newFolder(); - File snapshotFile = new File(directory, LookupSnapshotTaker.PERSIST_FILE_NAME); + LookupSnapshotTaker lookupSnapshotTaker = new LookupSnapshotTaker(mapper, directory.getAbsolutePath()); + File snapshotFile = lookupSnapshotTaker.getPersistFile(TIER1); Assert.assertFalse(snapshotFile.exists()); Assert.assertTrue(snapshotFile.createNewFile()); Assert.assertTrue(snapshotFile.setReadOnly()); Assert.assertTrue(snapshotFile.getParentFile().setReadOnly()); - LookupSnapshotTaker lookupSnapshotTaker = new LookupSnapshotTaker(mapper, directory.getAbsolutePath()); LookupBean lookupBean = new LookupBean( "name", null, @@ -144,25 +118,25 @@ public void testIOExceptionDuringLookupPersist() throws IOException expectedException.expect(ISE.class); expectedException.expectMessage("Exception during serialization of lookups"); - lookupSnapshotTaker.takeSnapshot(lookupBeanList); + lookupSnapshotTaker.takeSnapshot(TIER1, lookupBeanList); } @Test public void tesLookupPullingFromEmptyFile() throws IOException { - File snapshotFile = lookupSnapshotTaker.getPersistFile(); + File snapshotFile = lookupSnapshotTaker.getPersistFile(TIER1); Assert.assertTrue(snapshotFile.createNewFile()); - Assert.assertEquals(Collections.EMPTY_LIST, lookupSnapshotTaker.pullExistingSnapshot()); + Assert.assertEquals(Collections.EMPTY_LIST, lookupSnapshotTaker.pullExistingSnapshot(TIER1)); } @Test(expected = ISE.class) public void tesLookupPullingFromCorruptFile() throws IOException { - File snapshotFile = lookupSnapshotTaker.getPersistFile(); + File snapshotFile = lookupSnapshotTaker.getPersistFile(TIER1); Assert.assertTrue(snapshotFile.createNewFile()); byte[] bytes = StringUtils.toUtf8("test corrupt file"); Files.write(bytes, snapshotFile); - lookupSnapshotTaker.pullExistingSnapshot(); + lookupSnapshotTaker.pullExistingSnapshot(TIER1); } @Test @@ -170,7 +144,7 @@ public void testLookupPullingFromNonExistingFile() throws IOException { File directory = temporaryFolder.newFolder(); LookupSnapshotTaker lookupSnapshotTaker = new LookupSnapshotTaker(mapper, directory.getAbsolutePath()); - List actualList = lookupSnapshotTaker.pullExistingSnapshot(); + List actualList = lookupSnapshotTaker.pullExistingSnapshot(TIER1); Assert.assertEquals(Collections.EMPTY_LIST, actualList); } }