From a21655a6707223f4f0f7c1836d8dd0a6f6ed26af Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 19 Jul 2019 15:04:39 -0700 Subject: [PATCH 1/8] Fix race between canHandle() and addSegment() in StorageLocation --- .../worker/IntermediaryDataManager.java | 56 +++--- .../SegmentLoaderLocalCacheManager.java | 35 ++-- .../segment/loading/StorageLocation.java | 61 +++--- ...oaderLocalCacheManagerConcurrencyTest.java | 181 ++++++++++++++++++ .../segment/loading/StorageLocationTest.java | 26 ++- 5 files changed, 277 insertions(+), 82 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java index bfd202e8835a..154c0187dd1f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java @@ -63,10 +63,10 @@ * This class manages intermediary segments for data shuffle between native parallel index tasks. * In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers * and phase 2 tasks read those files via HTTP. - * + *

* The directory where segment files are placed is structured as * {@link StorageLocation#path}/supervisorTaskId/startTimeOfSegment/endTimeOfSegment/partitionIdOfSegment. - * + *

* This class provides interfaces to store, find, and remove segment files. * It also has a self-cleanup mechanism to clean up stale segment files. It periodically checks the last access time * per supervisorTask and removes its all segment files if the supervisorTask is not running anymore. @@ -186,7 +186,7 @@ private void discoverSupervisorTaskPartitions() /** * Check supervisorTask status if its partitions have not been accessed in timeout and * delete all partitions for the supervisorTask if it is already finished. - * + *

* Note that the overlord sends a cleanup request when a supervisorTask is finished. The below check is to trigger * the self-cleanup for when the cleanup request is missing. */ @@ -226,12 +226,11 @@ private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws Interrup /** * Write a segment into one of configured locations. The location to write is chosen in a round-robin manner per * supervisorTaskId. - * + *

* This method is only useful for the new Indexer model. Tasks running in the existing middleManager should the static * addSegment method. */ public void addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentFile) - throws IOException { final Iterator iterator = locationIterators.computeIfAbsent( supervisorTaskId, @@ -279,35 +278,32 @@ public static void addSegment( String subTaskId, DataSegment segment, File segmentFile - ) throws IOException - { - final StorageLocation location = findLocationForSegment(cyclicIterator, numLocations, segment); - final File destFile = new File( - getPartitionDir(location, supervisorTaskId, segment.getInterval(), segment.getShardSpec().getPartitionNum()), - subTaskId - ); - FileUtils.forceMkdirParent(destFile); - final long copiedBytes = Files.asByteSource(segmentFile).copyTo(Files.asByteSink(destFile)); - if (copiedBytes == 0) { - throw new IOE( - "0 bytes copied after copying a segment file from [%s] to [%s]", - segmentFile.getAbsolutePath(), - destFile.getAbsolutePath() - ); - } - location.addFile(destFile); - } - - private static StorageLocation findLocationForSegment( - Iterator cyclicIterator, - int numLocations, - DataSegment segment ) { for (int i = 0; i < numLocations; i++) { final StorageLocation location = cyclicIterator.next(); - if (location.canHandle(segment)) { - return location; + final File destFile = new File( + getPartitionDir(location, supervisorTaskId, segment.getInterval(), segment.getShardSpec().getPartitionNum()), + subTaskId + ); + if (location.reserve(destFile, segment.getId().toString(), segmentFile.length())) { + try { + FileUtils.forceMkdirParent(destFile); + final long copiedBytes = Files.asByteSource(segmentFile).copyTo(Files.asByteSink(destFile)); + if (copiedBytes == 0) { + throw new IOE( + "0 bytes copied after copying a segment file from [%s] to [%s]", + segmentFile.getAbsolutePath(), + destFile.getAbsolutePath() + ); + } else { + return; + } + } + catch (Exception e) { + log.warn(e, "Failed to write segmentFile at [%s]", destFile); + location.removeFile(segmentFile); + } } } throw new ISE("Can't find location to handle segment[%s]", segment); diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index b25c2163f71d..5c45bfd7e797 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -163,9 +163,7 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException if (loc == null) { loc = loadSegmentWithRetry(segment, storageDir); } - final File localStorageDir = new File(loc.getPath(), storageDir); - loc.addSegmentDir(localStorageDir, segment); - return localStorageDir; + return new File(loc.getPath(), storageDir); } finally { unlock(segment, lock); @@ -181,23 +179,24 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException { for (StorageLocation loc : locations) { - if (loc.canHandle(segment)) { - File storageDir = new File(loc.getPath(), storageDirStr); - + File storageDir = new File(loc.getPath(), storageDirStr); + if (loc.reserve(storageDir, segment)) { try { loadInLocationWithStartMarker(segment, storageDir); return loc; } catch (SegmentLoadingException e) { - log.makeAlert( - e, - "Failed to load segment in current location %s, try next location if any", - loc.getPath().getAbsolutePath() - ) - .addData("location", loc.getPath().getAbsolutePath()) - .emit(); - - cleanupCacheFiles(loc.getPath(), storageDir); + try { + log.makeAlert( + e, + "Failed to load segment in current location [%s], try next location if any", + loc.getPath().getAbsolutePath() + ).addData("location", loc.getPath().getAbsolutePath()).emit(); + } + finally { + loc.removeSegmentDir(storageDir, segment); + cleanupCacheFiles(loc.getPath(), storageDir); + } } } } @@ -366,4 +365,10 @@ public ConcurrentHashMap getSegmentLocks() { return segmentLocks; } + + @VisibleForTesting + public List getLocations() + { + return locations; + } } diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java index 4fb4c4ea0bc7..e81d04d2ca66 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java @@ -19,8 +19,8 @@ package org.apache.druid.segment.loading; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.io.FileUtils; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.timeline.DataSegment; @@ -72,65 +72,72 @@ public long getMaxSize() } /** - * Add a new file to this location. The given file argument must be a file rather than directory. + * Remove a segment file from this location. The given file argument must be a file rather than directory. */ - public synchronized void addFile(File file) + public synchronized void removeFile(File file) { - if (file.isDirectory()) { - throw new ISE("[%s] must be a file. Use a"); - } - if (files.add(file)) { - currSize += FileUtils.sizeOf(file); + if (files.remove(file)) { + currSize -= FileUtils.sizeOf(file); } } /** - * Add a new segment dir to this location. The segment size is added to currSize. + * Remove a segment dir from this location. The segment size is subtracted from currSize. */ - public synchronized void addSegmentDir(File segmentDir, DataSegment segment) + public synchronized void removeSegmentDir(File segmentDir, DataSegment segment) { - if (files.add(segmentDir)) { - currSize += segment.getSize(); + if (files.remove(segmentDir)) { + currSize -= segment.getSize(); } } /** - * Remove a segment file from this location. The given file argument must be a file rather than directory. + * Reserves space to store the given segment. The segment size is added to currSize. + * Returns true if it succeeds to add the given file. */ - public synchronized void removeFile(File file) + public synchronized boolean reserve(File segmentDir, DataSegment segment) { - if (files.remove(file)) { - currSize -= FileUtils.sizeOf(file); - } + return reserve(segmentDir, segment.getId().toString(), segment.getSize()); } /** - * Remove a segment dir from this location. The segment size is subtracted from currSize. + * Reserves space to store the given segment. Returns true if it succeeds to add the given file. */ - public synchronized void removeSegmentDir(File segmentDir, DataSegment segment) + public synchronized boolean reserve(File segmentFileToAdd, String segmentId, long segmentSize) { - if (files.remove(segmentDir)) { - currSize -= segment.getSize(); + if (files.contains(segmentFileToAdd)) { + return false; + } + if (canHandle(segmentId, segmentSize)) { + files.add(segmentFileToAdd); + currSize += segmentSize; + return true; + } else { + return false; } } - public boolean canHandle(DataSegment segment) + /** + * This method is available for only unit tests. Production code must use {@link #reserve} instead. + */ + @VisibleForTesting + boolean canHandle(String segmentId, long segmentSize) { - if (available() < segment.getSize()) { + if (available() < segmentSize) { log.warn( "Segment[%s:%,d] too large for storage[%s:%,d]. Check your druid.segmentCache.locations maxSize param", - segment.getId(), segment.getSize(), getPath(), available() + segmentId, segmentSize, getPath(), available() ); return false; } if (freeSpaceToKeep > 0) { long currFreeSpace = path.getFreeSpace(); - if ((freeSpaceToKeep + segment.getSize()) > currFreeSpace) { + if ((freeSpaceToKeep + segmentSize) > currFreeSpace) { log.warn( "Segment[%s:%,d] too large for storage[%s:%,d] to maintain suggested freeSpace[%d], current freeSpace is [%d].", - segment.getId(), - segment.getSize(), + segmentId, + segmentSize, getPath(), available(), freeSpaceToKeep, diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java new file mode 100644 index 000000000000..69ce71792bbc --- /dev/null +++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.loading; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.hamcrest.CoreMatchers; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.stream.Collectors; + +public class SegmentLoaderLocalCacheManagerConcurrencyTest +{ + @Rule + public final TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + private final ObjectMapper jsonMapper; + private final String dataSource = "test_ds"; + private final String segmentVersion; + + private File localSegmentCacheFolder; + private SegmentLoaderLocalCacheManager manager; + private ExecutorService executorService; + + public SegmentLoaderLocalCacheManagerConcurrencyTest() + { + jsonMapper = new DefaultObjectMapper(); + jsonMapper.registerSubtypes(new NamedType(LocalLoadSpec.class, "local")); + jsonMapper.setInjectableValues( + new InjectableValues.Std().addValue( + LocalDataSegmentPuller.class, + new LocalDataSegmentPuller() + ) + ); + segmentVersion = DateTimes.nowUtc().toString(); + } + + @Before + public void setUp() throws Exception + { + EmittingLogger.registerEmitter(new NoopServiceEmitter()); + localSegmentCacheFolder = tmpFolder.newFolder("segment_cache_folder"); + + final List locations = new ArrayList<>(); + // Each segment has the size of 1000 bytes. This deep storage is capable of storing up to 2 segments. + final StorageLocationConfig locationConfig = new StorageLocationConfig(localSegmentCacheFolder, 2000L, null); + locations.add(locationConfig); + + manager = new SegmentLoaderLocalCacheManager( + TestHelper.getTestIndexIO(), + new SegmentLoaderConfig().withLocations(locations), + jsonMapper + ); + executorService = Execs.multiThreaded(4, "segment-loader-local-cache-manager-concurrency-test-%d"); + } + + @After + public void tearDown() + { + executorService.shutdownNow(); + } + + @Test + public void testGetSegment() throws IOException, ExecutionException, InterruptedException + { + final File localStorageFolder = tmpFolder.newFolder("local_storage_folder"); + final List segmentsToLoad = new ArrayList<>(4); + + final Interval interval = Intervals.of("2019-01-01/P1D"); + for (int partitionId = 0; partitionId < 4; partitionId++) { + final String segmentPath = Paths.get( + localStorageFolder.getCanonicalPath(), + dataSource, + StringUtils.format("%s_%s", interval.getStart().toString(), interval.getEnd().toString()), + segmentVersion, + String.valueOf(partitionId) + ).toString(); + // manually create a local segment under localStorageFolder + final File localSegmentFile = new File( + localStorageFolder, + segmentPath + ); + localSegmentFile.mkdirs(); + final File indexZip = new File(localSegmentFile, "index.zip"); + indexZip.createNewFile(); + + final DataSegment segment = newSegment(interval, partitionId).withLoadSpec( + ImmutableMap.of( + "type", + "local", + "path", + localSegmentFile.getAbsolutePath() + ) + ); + segmentsToLoad.add(segment); + } + + final List futures = segmentsToLoad + .stream() + .map(segment -> executorService.submit(() -> manager.getSegmentFiles(segment))) + .collect(Collectors.toList()); + + expectedException.expect(ExecutionException.class); + expectedException.expectCause(CoreMatchers.instanceOf(SegmentLoadingException.class)); + expectedException.expectMessage("Failed to load segment"); + for (Future future : futures) { + future.get(); + } + + System.out.println(manager.getLocations().get(0).available()); + } + + private DataSegment newSegment(Interval interval, int partitionId) + { + return DataSegment.builder() + .dataSource(dataSource) + .interval(interval) + .loadSpec( + ImmutableMap.of( + "type", + "local", + "path", + "somewhere" + ) + ) + .version(segmentVersion) + .dimensions(ImmutableList.of()) + .metrics(ImmutableList.of()) + .shardSpec(new NumberedShardSpec(partitionId, 0)) + .binaryVersion(9) + .size(1000L) + .build(); + } +} diff --git a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java index cdfcd47ce8dc..6119fd66ca45 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; @@ -38,18 +39,18 @@ public void testStorageLocationFreePercent() { // free space ignored only maxSize matters StorageLocation locationPlain = fakeLocation(100_000, 5_000, 10_000, null); - Assert.assertTrue(locationPlain.canHandle(makeSegment("2012/2013", 9_000))); - Assert.assertFalse(locationPlain.canHandle(makeSegment("2012/2013", 11_000))); + Assert.assertTrue(locationPlain.canHandle(newSegmentId("2012/2013").toString(), 9_000)); + Assert.assertFalse(locationPlain.canHandle(newSegmentId("2012/2013").toString(), 11_000)); // enough space available maxSize is the limit StorageLocation locationFree = fakeLocation(100_000, 25_000, 10_000, 10.0); - Assert.assertTrue(locationFree.canHandle(makeSegment("2012/2013", 9_000))); - Assert.assertFalse(locationFree.canHandle(makeSegment("2012/2013", 11_000))); + Assert.assertTrue(locationFree.canHandle(newSegmentId("2012/2013").toString(), 9_000)); + Assert.assertFalse(locationFree.canHandle(newSegmentId("2012/2013").toString(), 11_000)); // disk almost full percentage is the limit StorageLocation locationFull = fakeLocation(100_000, 15_000, 10_000, 10.0); - Assert.assertTrue(locationFull.canHandle(makeSegment("2012/2013", 4_000))); - Assert.assertFalse(locationFull.canHandle(makeSegment("2012/2013", 6_000))); + Assert.assertTrue(locationFull.canHandle(newSegmentId("2012/2013").toString(), 4_000)); + Assert.assertFalse(locationFull.canHandle(newSegmentId("2012/2013").toString(), 6_000)); } private StorageLocation fakeLocation(long total, long free, long max, Double percent) @@ -71,14 +72,14 @@ public void testStorageLocation() final DataSegment secondSegment = makeSegment("2012-01-02/2012-01-03", 23); - loc.addSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); + loc.reserve(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); expectedAvail -= 10; verifyLoc(expectedAvail, loc); - loc.addSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); + loc.reserve(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); verifyLoc(expectedAvail, loc); - loc.addSegmentDir(new File("test2"), secondSegment); + loc.reserve(new File("test2"), secondSegment); expectedAvail -= 23; verifyLoc(expectedAvail, loc); @@ -98,7 +99,7 @@ private void verifyLoc(long maxSize, StorageLocation loc) { Assert.assertEquals(maxSize, loc.available()); for (int i = 0; i <= maxSize; ++i) { - Assert.assertTrue(String.valueOf(i), loc.canHandle(makeSegment("2013/2014", i))); + Assert.assertTrue(String.valueOf(i), loc.canHandle(newSegmentId("2013/2014").toString(), i)); } } @@ -116,4 +117,9 @@ private DataSegment makeSegment(String intervalString, long size) size ); } + + private SegmentId newSegmentId(String intervalString) + { + return SegmentId.of("test", Intervals.of(intervalString), "1", 0); + } } From d75b8de48e6c6bc989b36ef5ec381059fe828b9a Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 19 Jul 2019 15:19:42 -0700 Subject: [PATCH 2/8] add comment --- .../apache/druid/indexing/worker/IntermediaryDataManager.java | 1 + 1 file changed, 1 insertion(+) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java index 154c0187dd1f..8ac988bb3820 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java @@ -301,6 +301,7 @@ public static void addSegment( } } catch (Exception e) { + // Print only log here to try other locations as well. log.warn(e, "Failed to write segmentFile at [%s]", destFile); location.removeFile(segmentFile); } From d7903f54f94f5bc236cea27891ab935496b2e34a Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 20 Jul 2019 22:02:25 -0700 Subject: [PATCH 3/8] add comments --- .../worker/IntermediaryDataManager.java | 35 +++++++++++----- .../SegmentLoaderLocalCacheManager.java | 4 +- .../segment/loading/StorageLocation.java | 42 ++++++++++++++----- .../segment/loading/StorageLocationTest.java | 28 ++++++------- 4 files changed, 72 insertions(+), 37 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java index 8ac988bb3820..a66fda0a734a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java @@ -45,6 +45,7 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -242,7 +243,7 @@ public void addSegment(String supervisorTaskId, String subTaskId, DataSegment se public List findPartitionFiles(String supervisorTaskId, Interval interval, int partitionId) { for (StorageLocation location : shuffleDataLocations) { - final File partitionDir = getPartitionDir(location, supervisorTaskId, interval, partitionId); + final File partitionDir = new File(location.getPath(), getPartitionDir(supervisorTaskId, interval, partitionId)); if (partitionDir.exists()) { supervisorTaskCheckTimes.put(supervisorTaskId, DateTimes.nowUtc()); final File[] segmentFiles = partitionDir.listFiles(); @@ -282,11 +283,17 @@ public static void addSegment( { for (int i = 0; i < numLocations; i++) { final StorageLocation location = cyclicIterator.next(); - final File destFile = new File( - getPartitionDir(location, supervisorTaskId, segment.getInterval(), segment.getShardSpec().getPartitionNum()), - subTaskId + final File destFile = location.reserve( + getPartitionFilePath( + supervisorTaskId, + subTaskId, + segment.getInterval(), + segment.getShardSpec().getPartitionNum() + ), + segment.getId(), + segmentFile.length() ); - if (location.reserve(destFile, segment.getId().toString(), segmentFile.length())) { + if (destFile != null) { try { FileUtils.forceMkdirParent(destFile); final long copiedBytes = Files.asByteSource(segmentFile).copyTo(Files.asByteSink(destFile)); @@ -310,19 +317,27 @@ public static void addSegment( throw new ISE("Can't find location to handle segment[%s]", segment); } - private static File getPartitionDir( - StorageLocation location, + private static String getPartitionFilePath( + String supervisorTaskId, + String subTaskId, + Interval interval, + int partitionId + ) + { + return Paths.get(getPartitionDir(supervisorTaskId, interval, partitionId), subTaskId).toString(); + } + + private static String getPartitionDir( String supervisorTaskId, Interval interval, int partitionId ) { - return FileUtils.getFile( - location.getPath(), + return Paths.get( supervisorTaskId, interval.getStart().toString(), interval.getEnd().toString(), String.valueOf(partitionId) - ); + ).toString(); } } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 5c45bfd7e797..fa2c6065f0d2 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -179,8 +179,8 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException { for (StorageLocation loc : locations) { - File storageDir = new File(loc.getPath(), storageDirStr); - if (loc.reserve(storageDir, segment)) { + File storageDir = loc.reserve(storageDirStr, segment); + if (storageDir != null) { try { loadInLocationWithStartMarker(segment, storageDir); return loc; diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java index e81d04d2ca66..f6733929c2c6 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java @@ -23,6 +23,7 @@ import org.apache.commons.io.FileUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import javax.annotation.Nullable; import java.io.File; @@ -30,17 +31,27 @@ import java.util.Set; /** + * This class is a very simple logical representation of a local path. It keeps track of files stored under the + * {@link #path} via {@link #reserve}, so that the total size of stored files doesn't exceed the {@link #maxSize} and + * available space is always kept smaller than {@link #freeSpaceToKeep}. + * + * This class is thread-safe, so that multiple threads can update its state at the same time. + * One example usage is that a historical can use multiple threads to load different segments in parallel + * from deep storage. */ public class StorageLocation { private static final Logger log = new Logger(StorageLocation.class); private final File path; - private final long maxSize; + private final long maxSize; // in bytes private final long freeSpaceToKeep; + + // Set of files stored under the given path. All accesses must be synchronized with currSize. private final Set files = new HashSet<>(); - private volatile long currSize = 0; + // Current total size of files in bytes. All accesses must be synchronized with files. + private long currSize = 0; public StorageLocation(File path, long maxSize, @Nullable Double freeSpacePercent) { @@ -78,6 +89,8 @@ public synchronized void removeFile(File file) { if (files.remove(file)) { currSize -= FileUtils.sizeOf(file); + } else { + log.warn("File[%s] is not found under this location[%s]", file, path); } } @@ -88,32 +101,39 @@ public synchronized void removeSegmentDir(File segmentDir, DataSegment segment) { if (files.remove(segmentDir)) { currSize -= segment.getSize(); + } else { + log.warn("SegmentDir[%s] is not found under this location[%s]", segmentDir, path); } } /** * Reserves space to store the given segment. The segment size is added to currSize. - * Returns true if it succeeds to add the given file. + * If it succeeds, it returns a file for the given segmentDir in this storage location. Returns null otherwise. */ - public synchronized boolean reserve(File segmentDir, DataSegment segment) + @Nullable + public synchronized File reserve(String segmentDir, DataSegment segment) { - return reserve(segmentDir, segment.getId().toString(), segment.getSize()); + return reserve(segmentDir, segment.getId(), segment.getSize()); } /** - * Reserves space to store the given segment. Returns true if it succeeds to add the given file. + * Reserves space to store the given segment. + * If it succeeds, it returns a file for the given segmentFilePathToAdd in this storage location. + * Returns null otherwise. */ - public synchronized boolean reserve(File segmentFileToAdd, String segmentId, long segmentSize) + @Nullable + public synchronized File reserve(String segmentFilePathToAdd, SegmentId segmentId, long segmentSize) { + final File segmentFileToAdd = new File(path, segmentFilePathToAdd); if (files.contains(segmentFileToAdd)) { - return false; + return null; } if (canHandle(segmentId, segmentSize)) { files.add(segmentFileToAdd); currSize += segmentSize; - return true; + return segmentFileToAdd; } else { - return false; + return null; } } @@ -121,7 +141,7 @@ public synchronized boolean reserve(File segmentFileToAdd, String segmentId, lon * This method is available for only unit tests. Production code must use {@link #reserve} instead. */ @VisibleForTesting - boolean canHandle(String segmentId, long segmentSize) + boolean canHandle(SegmentId segmentId, long segmentSize) { if (available() < segmentSize) { log.warn( diff --git a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java index 6119fd66ca45..7321e08bab6a 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java @@ -39,18 +39,18 @@ public void testStorageLocationFreePercent() { // free space ignored only maxSize matters StorageLocation locationPlain = fakeLocation(100_000, 5_000, 10_000, null); - Assert.assertTrue(locationPlain.canHandle(newSegmentId("2012/2013").toString(), 9_000)); - Assert.assertFalse(locationPlain.canHandle(newSegmentId("2012/2013").toString(), 11_000)); + Assert.assertTrue(locationPlain.canHandle(newSegmentId("2012/2013"), 9_000)); + Assert.assertFalse(locationPlain.canHandle(newSegmentId("2012/2013"), 11_000)); // enough space available maxSize is the limit StorageLocation locationFree = fakeLocation(100_000, 25_000, 10_000, 10.0); - Assert.assertTrue(locationFree.canHandle(newSegmentId("2012/2013").toString(), 9_000)); - Assert.assertFalse(locationFree.canHandle(newSegmentId("2012/2013").toString(), 11_000)); + Assert.assertTrue(locationFree.canHandle(newSegmentId("2012/2013"), 9_000)); + Assert.assertFalse(locationFree.canHandle(newSegmentId("2012/2013"), 11_000)); // disk almost full percentage is the limit StorageLocation locationFull = fakeLocation(100_000, 15_000, 10_000, 10.0); - Assert.assertTrue(locationFull.canHandle(newSegmentId("2012/2013").toString(), 4_000)); - Assert.assertFalse(locationFull.canHandle(newSegmentId("2012/2013").toString(), 6_000)); + Assert.assertTrue(locationFull.canHandle(newSegmentId("2012/2013"), 4_000)); + Assert.assertFalse(locationFull.canHandle(newSegmentId("2012/2013"), 6_000)); } private StorageLocation fakeLocation(long total, long free, long max, Double percent) @@ -66,31 +66,31 @@ private StorageLocation fakeLocation(long total, long free, long max, Double per public void testStorageLocation() { long expectedAvail = 1000L; - StorageLocation loc = new StorageLocation(new File("/tmp"), expectedAvail, null); + StorageLocation loc = new StorageLocation(new File(""), expectedAvail, null); verifyLoc(expectedAvail, loc); final DataSegment secondSegment = makeSegment("2012-01-02/2012-01-03", 23); - loc.reserve(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); + loc.reserve("test1", makeSegment("2012-01-01/2012-01-02", 10)); expectedAvail -= 10; verifyLoc(expectedAvail, loc); - loc.reserve(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); + loc.reserve("test1", makeSegment("2012-01-01/2012-01-02", 10)); verifyLoc(expectedAvail, loc); - loc.reserve(new File("test2"), secondSegment); + loc.reserve("test2", secondSegment); expectedAvail -= 23; verifyLoc(expectedAvail, loc); - loc.removeSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); + loc.removeSegmentDir(new File("/tmp/test1"), makeSegment("2012-01-01/2012-01-02", 10)); expectedAvail += 10; verifyLoc(expectedAvail, loc); - loc.removeSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); + loc.removeSegmentDir(new File("/tmp/test1"), makeSegment("2012-01-01/2012-01-02", 10)); verifyLoc(expectedAvail, loc); - loc.removeSegmentDir(new File("test2"), secondSegment); + loc.removeSegmentDir(new File("/tmp/test2"), secondSegment); expectedAvail += 23; verifyLoc(expectedAvail, loc); } @@ -99,7 +99,7 @@ private void verifyLoc(long maxSize, StorageLocation loc) { Assert.assertEquals(maxSize, loc.available()); for (int i = 0; i <= maxSize; ++i) { - Assert.assertTrue(String.valueOf(i), loc.canHandle(newSegmentId("2013/2014").toString(), i)); + Assert.assertTrue(String.valueOf(i), loc.canHandle(newSegmentId("2013/2014"), i)); } } From 5a1acaaeccefa9e686da9d078d71f66f3dfe5da4 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sun, 21 Jul 2019 09:19:31 -0700 Subject: [PATCH 4/8] fix test --- .../org/apache/druid/segment/loading/StorageLocationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java index 7321e08bab6a..4c4ad813384a 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java @@ -66,7 +66,7 @@ private StorageLocation fakeLocation(long total, long free, long max, Double per public void testStorageLocation() { long expectedAvail = 1000L; - StorageLocation loc = new StorageLocation(new File(""), expectedAvail, null); + StorageLocation loc = new StorageLocation(new File("/tmp"), expectedAvail, null); verifyLoc(expectedAvail, loc); From a8e894f268fb2c51c4fe5b6b1f7988b31ad4eb97 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 23 Jul 2019 16:24:58 -0700 Subject: [PATCH 5/8] address comments --- .../worker/IntermediaryDataManager.java | 2 +- .../SegmentLoaderLocalCacheManager.java | 2 +- .../segment/loading/StorageLocation.java | 52 ++++++++++--------- ...oaderLocalCacheManagerConcurrencyTest.java | 2 +- .../segment/loading/StorageLocationTest.java | 2 +- 5 files changed, 32 insertions(+), 28 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java index a66fda0a734a..d6001ce82d3c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java @@ -308,7 +308,7 @@ public static void addSegment( } } catch (Exception e) { - // Print only log here to try other locations as well. + // Only log here to try other locations as well. log.warn(e, "Failed to write segmentFile at [%s]", destFile); location.removeFile(segmentFile); } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index fa2c6065f0d2..d0670163f5fc 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -44,7 +44,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader { private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class); private static final Comparator COMPARATOR = (left, right) -> - Longs.compare(right.available(), left.available()); + Longs.compare(right.availableSizeBytes(), left.availableSizeBytes()); private final IndexIO indexIO; private final SegmentLoaderConfig config; diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java index f6733929c2c6..916e1257d45d 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java @@ -26,14 +26,15 @@ import org.apache.druid.timeline.SegmentId; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import java.io.File; import java.util.HashSet; import java.util.Set; /** * This class is a very simple logical representation of a local path. It keeps track of files stored under the - * {@link #path} via {@link #reserve}, so that the total size of stored files doesn't exceed the {@link #maxSize} and - * available space is always kept smaller than {@link #freeSpaceToKeep}. + * {@link #path} via {@link #reserve}, so that the total size of stored files doesn't exceed the {@link #maxSizeBytes} + * and available space is always kept smaller than {@link #freeSpaceToKeep}. * * This class is thread-safe, so that multiple threads can update its state at the same time. * One example usage is that a historical can use multiple threads to load different segments in parallel @@ -44,19 +45,25 @@ public class StorageLocation private static final Logger log = new Logger(StorageLocation.class); private final File path; - private final long maxSize; // in bytes + private final long maxSizeBytes; private final long freeSpaceToKeep; - // Set of files stored under the given path. All accesses must be synchronized with currSize. + /** + * Set of files stored under the given path. All accesses must be synchronized with currSizeBytes. + */ + @GuardedBy("this") private final Set files = new HashSet<>(); - // Current total size of files in bytes. All accesses must be synchronized with files. - private long currSize = 0; + /** + * Current total size of files in bytes. All accesses must be synchronized with files. + */ + @GuardedBy("this") + private long currSizeBytes = 0; - public StorageLocation(File path, long maxSize, @Nullable Double freeSpacePercent) + public StorageLocation(File path, long maxSizeBytes, @Nullable Double freeSpacePercent) { this.path = path; - this.maxSize = maxSize; + this.maxSizeBytes = maxSizeBytes; if (freeSpacePercent != null) { long totalSpaceInPartition = path.getTotalSpace(); @@ -77,37 +84,32 @@ public File getPath() return path; } - public long getMaxSize() - { - return maxSize; - } - /** * Remove a segment file from this location. The given file argument must be a file rather than directory. */ public synchronized void removeFile(File file) { if (files.remove(file)) { - currSize -= FileUtils.sizeOf(file); + currSizeBytes -= FileUtils.sizeOf(file); } else { log.warn("File[%s] is not found under this location[%s]", file, path); } } /** - * Remove a segment dir from this location. The segment size is subtracted from currSize. + * Remove a segment dir from this location. The segment size is subtracted from currSizeBytes. */ public synchronized void removeSegmentDir(File segmentDir, DataSegment segment) { if (files.remove(segmentDir)) { - currSize -= segment.getSize(); + currSizeBytes -= segment.getSize(); } else { log.warn("SegmentDir[%s] is not found under this location[%s]", segmentDir, path); } } /** - * Reserves space to store the given segment. The segment size is added to currSize. + * Reserves space to store the given segment. The segment size is added to currSizeBytes. * If it succeeds, it returns a file for the given segmentDir in this storage location. Returns null otherwise. */ @Nullable @@ -130,7 +132,7 @@ public synchronized File reserve(String segmentFilePathToAdd, SegmentId segmentI } if (canHandle(segmentId, segmentSize)) { files.add(segmentFileToAdd); - currSize += segmentSize; + currSizeBytes += segmentSize; return segmentFileToAdd; } else { return null; @@ -138,15 +140,17 @@ public synchronized File reserve(String segmentFilePathToAdd, SegmentId segmentI } /** - * This method is available for only unit tests. Production code must use {@link #reserve} instead. + * This method is only package-private to use it in unit tests. Production code must not call this method directly. + * Use {@link #reserve} instead. */ @VisibleForTesting + @GuardedBy("this") boolean canHandle(SegmentId segmentId, long segmentSize) { - if (available() < segmentSize) { + if (availableSizeBytes() < segmentSize) { log.warn( "Segment[%s:%,d] too large for storage[%s:%,d]. Check your druid.segmentCache.locations maxSize param", - segmentId, segmentSize, getPath(), available() + segmentId, segmentSize, getPath(), availableSizeBytes() ); return false; } @@ -159,7 +163,7 @@ segmentId, segmentSize, getPath(), available() segmentId, segmentSize, getPath(), - available(), + availableSizeBytes(), freeSpaceToKeep, currFreeSpace ); @@ -170,8 +174,8 @@ segmentId, segmentSize, getPath(), available() return true; } - public synchronized long available() + public synchronized long availableSizeBytes() { - return maxSize - currSize; + return maxSizeBytes - currSizeBytes; } } diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java index 69ce71792bbc..cb0821717668 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java @@ -154,7 +154,7 @@ public void testGetSegment() throws IOException, ExecutionException, Interrupted future.get(); } - System.out.println(manager.getLocations().get(0).available()); + System.out.println(manager.getLocations().get(0).availableSizeBytes()); } private DataSegment newSegment(Interval interval, int partitionId) diff --git a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java index 4c4ad813384a..23da42298783 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java @@ -97,7 +97,7 @@ public void testStorageLocation() private void verifyLoc(long maxSize, StorageLocation loc) { - Assert.assertEquals(maxSize, loc.available()); + Assert.assertEquals(maxSize, loc.availableSizeBytes()); for (int i = 0; i <= maxSize; ++i) { Assert.assertTrue(String.valueOf(i), loc.canHandle(newSegmentId("2013/2014"), i)); } From 9ca05b6da5bab49e795ca7f2c97be6d4a0e2268c Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 23 Jul 2019 16:50:55 -0700 Subject: [PATCH 6/8] remove

tag from javadoc --- .../druid/indexing/worker/IntermediaryDataManager.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java index d6001ce82d3c..59e33bebda80 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java @@ -64,10 +64,10 @@ * This class manages intermediary segments for data shuffle between native parallel index tasks. * In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers * and phase 2 tasks read those files via HTTP. - *

+ * * The directory where segment files are placed is structured as * {@link StorageLocation#path}/supervisorTaskId/startTimeOfSegment/endTimeOfSegment/partitionIdOfSegment. - *

+ * * This class provides interfaces to store, find, and remove segment files. * It also has a self-cleanup mechanism to clean up stale segment files. It periodically checks the last access time * per supervisorTask and removes its all segment files if the supervisorTask is not running anymore. @@ -187,7 +187,7 @@ private void discoverSupervisorTaskPartitions() /** * Check supervisorTask status if its partitions have not been accessed in timeout and * delete all partitions for the supervisorTask if it is already finished. - *

+ * * Note that the overlord sends a cleanup request when a supervisorTask is finished. The below check is to trigger * the self-cleanup for when the cleanup request is missing. */ @@ -227,7 +227,7 @@ private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws Interrup /** * Write a segment into one of configured locations. The location to write is chosen in a round-robin manner per * supervisorTaskId. - *

+ * * This method is only useful for the new Indexer model. Tasks running in the existing middleManager should the static * addSegment method. */ From f67468268c0ea9bcc331b3b5f6f2ac985ded1783 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 26 Jul 2019 10:44:47 -0700 Subject: [PATCH 7/8] address comments --- .../druid/indexing/worker/IntermediaryDataManager.java | 2 +- .../segment/loading/SegmentLoaderLocalCacheManager.java | 6 +++--- .../org/apache/druid/segment/loading/StorageLocation.java | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java index 59e33bebda80..c47425a4ee87 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java @@ -307,7 +307,7 @@ public static void addSegment( return; } } - catch (Exception e) { + catch (IOException e) { // Only log here to try other locations as well. log.warn(e, "Failed to write segmentFile at [%s]", destFile); location.removeFile(segmentFile); diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index d0670163f5fc..4b74257ab794 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import com.google.common.primitives.Longs; import com.google.inject.Inject; import org.apache.commons.io.FileUtils; import org.apache.druid.guice.annotations.Json; @@ -43,8 +42,9 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader { private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class); - private static final Comparator COMPARATOR = (left, right) -> - Longs.compare(right.availableSizeBytes(), left.availableSizeBytes()); + private static final Comparator COMPARATOR = Comparator + .comparing(StorageLocation::availableSizeBytes) + .reversed(); private final IndexIO indexIO; private final SegmentLoaderConfig config; diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java index 916e1257d45d..842295ebb9fb 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java @@ -49,13 +49,13 @@ public class StorageLocation private final long freeSpaceToKeep; /** - * Set of files stored under the given path. All accesses must be synchronized with currSizeBytes. + * Set of files stored under the {@link #path}. */ @GuardedBy("this") private final Set files = new HashSet<>(); /** - * Current total size of files in bytes. All accesses must be synchronized with files. + * Current total size of files in bytes. */ @GuardedBy("this") private long currSizeBytes = 0; From 65323133cdf95f8ba1c41091a852296e1089c31a Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 26 Jul 2019 10:50:32 -0700 Subject: [PATCH 8/8] comparingLong --- .../druid/segment/loading/SegmentLoaderLocalCacheManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 4b74257ab794..e0f0f0b4ccdb 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -43,7 +43,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader { private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class); private static final Comparator COMPARATOR = Comparator - .comparing(StorageLocation::availableSizeBytes) + .comparingLong(StorageLocation::availableSizeBytes) .reversed(); private final IndexIO indexIO;