From a21655a6707223f4f0f7c1836d8dd0a6f6ed26af Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 19 Jul 2019 15:04:39 -0700 Subject: [PATCH 01/13] Fix race between canHandle() and addSegment() in StorageLocation --- .../worker/IntermediaryDataManager.java | 56 +++--- .../SegmentLoaderLocalCacheManager.java | 35 ++-- .../segment/loading/StorageLocation.java | 61 +++--- ...oaderLocalCacheManagerConcurrencyTest.java | 181 ++++++++++++++++++ .../segment/loading/StorageLocationTest.java | 26 ++- 5 files changed, 277 insertions(+), 82 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java index bfd202e8835a..154c0187dd1f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java @@ -63,10 +63,10 @@ * This class manages intermediary segments for data shuffle between native parallel index tasks. * In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers * and phase 2 tasks read those files via HTTP. - * + *

* The directory where segment files are placed is structured as * {@link StorageLocation#path}/supervisorTaskId/startTimeOfSegment/endTimeOfSegment/partitionIdOfSegment. - * + *

* This class provides interfaces to store, find, and remove segment files. * It also has a self-cleanup mechanism to clean up stale segment files. It periodically checks the last access time * per supervisorTask and removes its all segment files if the supervisorTask is not running anymore. @@ -186,7 +186,7 @@ private void discoverSupervisorTaskPartitions() /** * Check supervisorTask status if its partitions have not been accessed in timeout and * delete all partitions for the supervisorTask if it is already finished. - * + *

* Note that the overlord sends a cleanup request when a supervisorTask is finished. The below check is to trigger * the self-cleanup for when the cleanup request is missing. */ @@ -226,12 +226,11 @@ private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws Interrup /** * Write a segment into one of configured locations. The location to write is chosen in a round-robin manner per * supervisorTaskId. - * + *

* This method is only useful for the new Indexer model. Tasks running in the existing middleManager should the static * addSegment method. */ public void addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentFile) - throws IOException { final Iterator iterator = locationIterators.computeIfAbsent( supervisorTaskId, @@ -279,35 +278,32 @@ public static void addSegment( String subTaskId, DataSegment segment, File segmentFile - ) throws IOException - { - final StorageLocation location = findLocationForSegment(cyclicIterator, numLocations, segment); - final File destFile = new File( - getPartitionDir(location, supervisorTaskId, segment.getInterval(), segment.getShardSpec().getPartitionNum()), - subTaskId - ); - FileUtils.forceMkdirParent(destFile); - final long copiedBytes = Files.asByteSource(segmentFile).copyTo(Files.asByteSink(destFile)); - if (copiedBytes == 0) { - throw new IOE( - "0 bytes copied after copying a segment file from [%s] to [%s]", - segmentFile.getAbsolutePath(), - destFile.getAbsolutePath() - ); - } - location.addFile(destFile); - } - - private static StorageLocation findLocationForSegment( - Iterator cyclicIterator, - int numLocations, - DataSegment segment ) { for (int i = 0; i < numLocations; i++) { final StorageLocation location = cyclicIterator.next(); - if (location.canHandle(segment)) { - return location; + final File destFile = new File( + getPartitionDir(location, supervisorTaskId, segment.getInterval(), segment.getShardSpec().getPartitionNum()), + subTaskId + ); + if (location.reserve(destFile, segment.getId().toString(), segmentFile.length())) { + try { + FileUtils.forceMkdirParent(destFile); + final long copiedBytes = Files.asByteSource(segmentFile).copyTo(Files.asByteSink(destFile)); + if (copiedBytes == 0) { + throw new IOE( + "0 bytes copied after copying a segment file from [%s] to [%s]", + segmentFile.getAbsolutePath(), + destFile.getAbsolutePath() + ); + } else { + return; + } + } + catch (Exception e) { + log.warn(e, "Failed to write segmentFile at [%s]", destFile); + location.removeFile(segmentFile); + } } } throw new ISE("Can't find location to handle segment[%s]", segment); diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index b25c2163f71d..5c45bfd7e797 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -163,9 +163,7 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException if (loc == null) { loc = loadSegmentWithRetry(segment, storageDir); } - final File localStorageDir = new File(loc.getPath(), storageDir); - loc.addSegmentDir(localStorageDir, segment); - return localStorageDir; + return new File(loc.getPath(), storageDir); } finally { unlock(segment, lock); @@ -181,23 +179,24 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException { for (StorageLocation loc : locations) { - if (loc.canHandle(segment)) { - File storageDir = new File(loc.getPath(), storageDirStr); - + File storageDir = new File(loc.getPath(), storageDirStr); + if (loc.reserve(storageDir, segment)) { try { loadInLocationWithStartMarker(segment, storageDir); return loc; } catch (SegmentLoadingException e) { - log.makeAlert( - e, - "Failed to load segment in current location %s, try next location if any", - loc.getPath().getAbsolutePath() - ) - .addData("location", loc.getPath().getAbsolutePath()) - .emit(); - - cleanupCacheFiles(loc.getPath(), storageDir); + try { + log.makeAlert( + e, + "Failed to load segment in current location [%s], try next location if any", + loc.getPath().getAbsolutePath() + ).addData("location", loc.getPath().getAbsolutePath()).emit(); + } + finally { + loc.removeSegmentDir(storageDir, segment); + cleanupCacheFiles(loc.getPath(), storageDir); + } } } } @@ -366,4 +365,10 @@ public ConcurrentHashMap getSegmentLocks() { return segmentLocks; } + + @VisibleForTesting + public List getLocations() + { + return locations; + } } diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java index 4fb4c4ea0bc7..e81d04d2ca66 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java @@ -19,8 +19,8 @@ package org.apache.druid.segment.loading; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.io.FileUtils; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.timeline.DataSegment; @@ -72,65 +72,72 @@ public long getMaxSize() } /** - * Add a new file to this location. The given file argument must be a file rather than directory. + * Remove a segment file from this location. The given file argument must be a file rather than directory. */ - public synchronized void addFile(File file) + public synchronized void removeFile(File file) { - if (file.isDirectory()) { - throw new ISE("[%s] must be a file. Use a"); - } - if (files.add(file)) { - currSize += FileUtils.sizeOf(file); + if (files.remove(file)) { + currSize -= FileUtils.sizeOf(file); } } /** - * Add a new segment dir to this location. The segment size is added to currSize. + * Remove a segment dir from this location. The segment size is subtracted from currSize. */ - public synchronized void addSegmentDir(File segmentDir, DataSegment segment) + public synchronized void removeSegmentDir(File segmentDir, DataSegment segment) { - if (files.add(segmentDir)) { - currSize += segment.getSize(); + if (files.remove(segmentDir)) { + currSize -= segment.getSize(); } } /** - * Remove a segment file from this location. The given file argument must be a file rather than directory. + * Reserves space to store the given segment. The segment size is added to currSize. + * Returns true if it succeeds to add the given file. */ - public synchronized void removeFile(File file) + public synchronized boolean reserve(File segmentDir, DataSegment segment) { - if (files.remove(file)) { - currSize -= FileUtils.sizeOf(file); - } + return reserve(segmentDir, segment.getId().toString(), segment.getSize()); } /** - * Remove a segment dir from this location. The segment size is subtracted from currSize. + * Reserves space to store the given segment. Returns true if it succeeds to add the given file. */ - public synchronized void removeSegmentDir(File segmentDir, DataSegment segment) + public synchronized boolean reserve(File segmentFileToAdd, String segmentId, long segmentSize) { - if (files.remove(segmentDir)) { - currSize -= segment.getSize(); + if (files.contains(segmentFileToAdd)) { + return false; + } + if (canHandle(segmentId, segmentSize)) { + files.add(segmentFileToAdd); + currSize += segmentSize; + return true; + } else { + return false; } } - public boolean canHandle(DataSegment segment) + /** + * This method is available for only unit tests. Production code must use {@link #reserve} instead. + */ + @VisibleForTesting + boolean canHandle(String segmentId, long segmentSize) { - if (available() < segment.getSize()) { + if (available() < segmentSize) { log.warn( "Segment[%s:%,d] too large for storage[%s:%,d]. Check your druid.segmentCache.locations maxSize param", - segment.getId(), segment.getSize(), getPath(), available() + segmentId, segmentSize, getPath(), available() ); return false; } if (freeSpaceToKeep > 0) { long currFreeSpace = path.getFreeSpace(); - if ((freeSpaceToKeep + segment.getSize()) > currFreeSpace) { + if ((freeSpaceToKeep + segmentSize) > currFreeSpace) { log.warn( "Segment[%s:%,d] too large for storage[%s:%,d] to maintain suggested freeSpace[%d], current freeSpace is [%d].", - segment.getId(), - segment.getSize(), + segmentId, + segmentSize, getPath(), available(), freeSpaceToKeep, diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java new file mode 100644 index 000000000000..69ce71792bbc --- /dev/null +++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.loading; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.hamcrest.CoreMatchers; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.stream.Collectors; + +public class SegmentLoaderLocalCacheManagerConcurrencyTest +{ + @Rule + public final TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + private final ObjectMapper jsonMapper; + private final String dataSource = "test_ds"; + private final String segmentVersion; + + private File localSegmentCacheFolder; + private SegmentLoaderLocalCacheManager manager; + private ExecutorService executorService; + + public SegmentLoaderLocalCacheManagerConcurrencyTest() + { + jsonMapper = new DefaultObjectMapper(); + jsonMapper.registerSubtypes(new NamedType(LocalLoadSpec.class, "local")); + jsonMapper.setInjectableValues( + new InjectableValues.Std().addValue( + LocalDataSegmentPuller.class, + new LocalDataSegmentPuller() + ) + ); + segmentVersion = DateTimes.nowUtc().toString(); + } + + @Before + public void setUp() throws Exception + { + EmittingLogger.registerEmitter(new NoopServiceEmitter()); + localSegmentCacheFolder = tmpFolder.newFolder("segment_cache_folder"); + + final List locations = new ArrayList<>(); + // Each segment has the size of 1000 bytes. This deep storage is capable of storing up to 2 segments. + final StorageLocationConfig locationConfig = new StorageLocationConfig(localSegmentCacheFolder, 2000L, null); + locations.add(locationConfig); + + manager = new SegmentLoaderLocalCacheManager( + TestHelper.getTestIndexIO(), + new SegmentLoaderConfig().withLocations(locations), + jsonMapper + ); + executorService = Execs.multiThreaded(4, "segment-loader-local-cache-manager-concurrency-test-%d"); + } + + @After + public void tearDown() + { + executorService.shutdownNow(); + } + + @Test + public void testGetSegment() throws IOException, ExecutionException, InterruptedException + { + final File localStorageFolder = tmpFolder.newFolder("local_storage_folder"); + final List segmentsToLoad = new ArrayList<>(4); + + final Interval interval = Intervals.of("2019-01-01/P1D"); + for (int partitionId = 0; partitionId < 4; partitionId++) { + final String segmentPath = Paths.get( + localStorageFolder.getCanonicalPath(), + dataSource, + StringUtils.format("%s_%s", interval.getStart().toString(), interval.getEnd().toString()), + segmentVersion, + String.valueOf(partitionId) + ).toString(); + // manually create a local segment under localStorageFolder + final File localSegmentFile = new File( + localStorageFolder, + segmentPath + ); + localSegmentFile.mkdirs(); + final File indexZip = new File(localSegmentFile, "index.zip"); + indexZip.createNewFile(); + + final DataSegment segment = newSegment(interval, partitionId).withLoadSpec( + ImmutableMap.of( + "type", + "local", + "path", + localSegmentFile.getAbsolutePath() + ) + ); + segmentsToLoad.add(segment); + } + + final List futures = segmentsToLoad + .stream() + .map(segment -> executorService.submit(() -> manager.getSegmentFiles(segment))) + .collect(Collectors.toList()); + + expectedException.expect(ExecutionException.class); + expectedException.expectCause(CoreMatchers.instanceOf(SegmentLoadingException.class)); + expectedException.expectMessage("Failed to load segment"); + for (Future future : futures) { + future.get(); + } + + System.out.println(manager.getLocations().get(0).available()); + } + + private DataSegment newSegment(Interval interval, int partitionId) + { + return DataSegment.builder() + .dataSource(dataSource) + .interval(interval) + .loadSpec( + ImmutableMap.of( + "type", + "local", + "path", + "somewhere" + ) + ) + .version(segmentVersion) + .dimensions(ImmutableList.of()) + .metrics(ImmutableList.of()) + .shardSpec(new NumberedShardSpec(partitionId, 0)) + .binaryVersion(9) + .size(1000L) + .build(); + } +} diff --git a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java index cdfcd47ce8dc..6119fd66ca45 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; @@ -38,18 +39,18 @@ public void testStorageLocationFreePercent() { // free space ignored only maxSize matters StorageLocation locationPlain = fakeLocation(100_000, 5_000, 10_000, null); - Assert.assertTrue(locationPlain.canHandle(makeSegment("2012/2013", 9_000))); - Assert.assertFalse(locationPlain.canHandle(makeSegment("2012/2013", 11_000))); + Assert.assertTrue(locationPlain.canHandle(newSegmentId("2012/2013").toString(), 9_000)); + Assert.assertFalse(locationPlain.canHandle(newSegmentId("2012/2013").toString(), 11_000)); // enough space available maxSize is the limit StorageLocation locationFree = fakeLocation(100_000, 25_000, 10_000, 10.0); - Assert.assertTrue(locationFree.canHandle(makeSegment("2012/2013", 9_000))); - Assert.assertFalse(locationFree.canHandle(makeSegment("2012/2013", 11_000))); + Assert.assertTrue(locationFree.canHandle(newSegmentId("2012/2013").toString(), 9_000)); + Assert.assertFalse(locationFree.canHandle(newSegmentId("2012/2013").toString(), 11_000)); // disk almost full percentage is the limit StorageLocation locationFull = fakeLocation(100_000, 15_000, 10_000, 10.0); - Assert.assertTrue(locationFull.canHandle(makeSegment("2012/2013", 4_000))); - Assert.assertFalse(locationFull.canHandle(makeSegment("2012/2013", 6_000))); + Assert.assertTrue(locationFull.canHandle(newSegmentId("2012/2013").toString(), 4_000)); + Assert.assertFalse(locationFull.canHandle(newSegmentId("2012/2013").toString(), 6_000)); } private StorageLocation fakeLocation(long total, long free, long max, Double percent) @@ -71,14 +72,14 @@ public void testStorageLocation() final DataSegment secondSegment = makeSegment("2012-01-02/2012-01-03", 23); - loc.addSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); + loc.reserve(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); expectedAvail -= 10; verifyLoc(expectedAvail, loc); - loc.addSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); + loc.reserve(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); verifyLoc(expectedAvail, loc); - loc.addSegmentDir(new File("test2"), secondSegment); + loc.reserve(new File("test2"), secondSegment); expectedAvail -= 23; verifyLoc(expectedAvail, loc); @@ -98,7 +99,7 @@ private void verifyLoc(long maxSize, StorageLocation loc) { Assert.assertEquals(maxSize, loc.available()); for (int i = 0; i <= maxSize; ++i) { - Assert.assertTrue(String.valueOf(i), loc.canHandle(makeSegment("2013/2014", i))); + Assert.assertTrue(String.valueOf(i), loc.canHandle(newSegmentId("2013/2014").toString(), i)); } } @@ -116,4 +117,9 @@ private DataSegment makeSegment(String intervalString, long size) size ); } + + private SegmentId newSegmentId(String intervalString) + { + return SegmentId.of("test", Intervals.of(intervalString), "1", 0); + } } From d75b8de48e6c6bc989b36ef5ec381059fe828b9a Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 19 Jul 2019 15:19:42 -0700 Subject: [PATCH 02/13] add comment --- .../apache/druid/indexing/worker/IntermediaryDataManager.java | 1 + 1 file changed, 1 insertion(+) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java index 154c0187dd1f..8ac988bb3820 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java @@ -301,6 +301,7 @@ public static void addSegment( } } catch (Exception e) { + // Print only log here to try other locations as well. log.warn(e, "Failed to write segmentFile at [%s]", destFile); location.removeFile(segmentFile); } From 00ded1a4270965e616a9e48745ae6a860224f6fc Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 19 Jul 2019 16:04:52 -0700 Subject: [PATCH 03/13] Add shuffleSegmentPusher which is a dataSegmentPusher used for writing shuffle data in local storage. --- .../indexing/common/config/TaskConfig.java | 5 + .../worker/IntermediaryDataManager.java | 119 +++++++----- .../worker/ShuffleDataSegmentPusher.java | 73 +++++++ ...ntermediaryDataManagerAutoCleanupTest.java | 13 +- ...iaryDataManagerManualAddAndDeleteTest.java | 45 +++-- .../worker/ShuffleDataSegmentPusherTest.java | 139 ++++++++++++++ .../SegmentLoaderLocalCacheManager.java | 35 ++-- .../segment/loading/StorageLocation.java | 61 +++--- ...oaderLocalCacheManagerConcurrencyTest.java | 181 ++++++++++++++++++ .../segment/loading/StorageLocationTest.java | 26 ++- 10 files changed, 567 insertions(+), 130 deletions(-) create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java create mode 100644 server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java index 31405fe4e7c2..52bf08357976 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java @@ -126,6 +126,11 @@ public File getTaskWorkDir(String taskId) return new File(getTaskDir(taskId), "work"); } + public File getTaskTempDir(String taskId) + { + return new File(getTaskDir(taskId), "temp"); + } + public File getTaskLockFile(String taskId) { return new File(getTaskDir(taskId), "lock"); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java index bfd202e8835a..e207a5a27605 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java @@ -32,12 +32,14 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StreamUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.loading.StorageLocation; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.utils.CompressionUtils; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.Period; @@ -54,19 +56,22 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * This class manages intermediary segments for data shuffle between native parallel index tasks. * In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers * and phase 2 tasks read those files via HTTP. - * + *

* The directory where segment files are placed is structured as * {@link StorageLocation#path}/supervisorTaskId/startTimeOfSegment/endTimeOfSegment/partitionIdOfSegment. - * + *

* This class provides interfaces to store, find, and remove segment files. * It also has a self-cleanup mechanism to clean up stale segment files. It periodically checks the last access time * per supervisorTask and removes its all segment files if the supervisorTask is not running anymore. @@ -75,10 +80,12 @@ public class IntermediaryDataManager { private static final Logger log = new Logger(IntermediaryDataManager.class); + private static final int DEFAULT_FILE_COPY_RETRY_COUNT = 3; private final long intermediaryPartitionDiscoveryPeriodSec; private final long intermediaryPartitionCleanupPeriodSec; private final Period intermediaryPartitionTimeout; + private final TaskConfig taskConfig; private final List shuffleDataLocations; private final IndexingServiceClient indexingServiceClient; @@ -107,6 +114,7 @@ public IntermediaryDataManager( this.intermediaryPartitionDiscoveryPeriodSec = workerConfig.getIntermediaryPartitionDiscoveryPeriodSec(); this.intermediaryPartitionCleanupPeriodSec = workerConfig.getIntermediaryPartitionCleanupPeriodSec(); this.intermediaryPartitionTimeout = workerConfig.getIntermediaryPartitionTimeout(); + this.taskConfig = taskConfig; this.shuffleDataLocations = taskConfig .getShuffleDataLocations() .stream() @@ -173,6 +181,9 @@ private void discoverSupervisorTaskPartitions() supervisorTaskCheckTimes.computeIfAbsent( supervisorTaskId, k -> { + for (File eachFile : FileUtils.listFiles(supervisorTaskDir, null, true)) { + location.reserve(eachFile, eachFile.getName(), eachFile.length()); + } numDiscovered.increment(); return DateTimes.nowUtc().plus(intermediaryPartitionTimeout); } @@ -186,7 +197,7 @@ private void discoverSupervisorTaskPartitions() /** * Check supervisorTask status if its partitions have not been accessed in timeout and * delete all partitions for the supervisorTask if it is already finished. - * + *

* Note that the overlord sends a cleanup request when a supervisorTask is finished. The below check is to trigger * the self-cleanup for when the cleanup request is missing. */ @@ -226,18 +237,68 @@ private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws Interrup /** * Write a segment into one of configured locations. The location to write is chosen in a round-robin manner per * supervisorTaskId. - * + *

* This method is only useful for the new Indexer model. Tasks running in the existing middleManager should the static * addSegment method. */ - public void addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentFile) + public long addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentDir) throws IOException { + // Get or create the location iterator for supervisorTask. final Iterator iterator = locationIterators.computeIfAbsent( supervisorTaskId, - k -> Iterators.cycle(shuffleDataLocations) + k -> { + final Iterator cyclicIterator = Iterators.cycle(shuffleDataLocations); + // Random start of the iterator + final int random = ThreadLocalRandom.current().nextInt(shuffleDataLocations.size()); + IntStream.range(0, random).forEach(i -> cyclicIterator.next()); + return cyclicIterator; + } ); - addSegment(iterator, shuffleDataLocations.size(), supervisorTaskId, subTaskId, segment, segmentFile); + + // Create a zipped segment in a temp directory. + final File taskTempDir = taskConfig.getTaskTempDir(subTaskId); + if (taskTempDir.mkdirs()) { + taskTempDir.deleteOnExit(); + } + final File tempZippedFile = new File(taskTempDir, segment.getId().toString()); + final long unzippedSizeBytes = CompressionUtils.zip(segmentDir, tempZippedFile, true); + if (unzippedSizeBytes == 0) { + throw new IOE( + "Read 0 bytes from segmentDir[%s]", + segmentDir.getAbsolutePath() + ); + } + + // Try copying the zipped segment to one of storage locations + for (int i = 0; i < shuffleDataLocations.size(); i++) { + final StorageLocation location = iterator.next(); + final File destFile = new File( + getPartitionDir(location, supervisorTaskId, segment.getInterval(), segment.getShardSpec().getPartitionNum()), + subTaskId + ); + if (location.reserve(destFile, segment.getId().toString(), tempZippedFile.length())) { + try { + FileUtils.forceMkdirParent(destFile); + StreamUtils.retryCopy( + Files.asByteSource(tempZippedFile), + Files.asByteSink(destFile), + t -> !(t instanceof InterruptedException) && !(t instanceof CancellationException) && (t instanceof Exception), + DEFAULT_FILE_COPY_RETRY_COUNT + ); + if (!tempZippedFile.delete()) { + log.warn("Couldn't delete file[%s]", tempZippedFile.getAbsolutePath()); + } + return unzippedSizeBytes; + } + catch (Exception e) { + // Print only log here to try other locations as well. + log.warn(e, "Failed to write segmentFile at [%s]", destFile); + location.removeFile(tempZippedFile); + } + } + } + throw new ISE("Can't find location to handle segment[%s]", segment); } public List findPartitionFiles(String supervisorTaskId, Interval interval, int partitionId) @@ -269,50 +330,6 @@ public void deletePartitions(String supervisorTaskId) throws IOException supervisorTaskCheckTimes.remove(supervisorTaskId); } - /** - * Iterate through the given storage locations to find one which can handle the given segment. - */ - public static void addSegment( - Iterator cyclicIterator, - int numLocations, - String supervisorTaskId, - String subTaskId, - DataSegment segment, - File segmentFile - ) throws IOException - { - final StorageLocation location = findLocationForSegment(cyclicIterator, numLocations, segment); - final File destFile = new File( - getPartitionDir(location, supervisorTaskId, segment.getInterval(), segment.getShardSpec().getPartitionNum()), - subTaskId - ); - FileUtils.forceMkdirParent(destFile); - final long copiedBytes = Files.asByteSource(segmentFile).copyTo(Files.asByteSink(destFile)); - if (copiedBytes == 0) { - throw new IOE( - "0 bytes copied after copying a segment file from [%s] to [%s]", - segmentFile.getAbsolutePath(), - destFile.getAbsolutePath() - ); - } - location.addFile(destFile); - } - - private static StorageLocation findLocationForSegment( - Iterator cyclicIterator, - int numLocations, - DataSegment segment - ) - { - for (int i = 0; i < numLocations; i++) { - final StorageLocation location = cyclicIterator.next(); - if (location.canHandle(segment)) { - return location; - } - } - throw new ISE("Can't find location to handle segment[%s]", segment); - } - private static File getPartitionDir( StorageLocation location, String supervisorTaskId, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java new file mode 100644 index 000000000000..39947f413d75 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.worker; + +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.timeline.DataSegment; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.Map; + +public class ShuffleDataSegmentPusher implements DataSegmentPusher +{ + private final String supervisorTaskId; + private final String subTaskId; + private final IntermediaryDataManager intermediaryDataManager; + + public ShuffleDataSegmentPusher( + String supervisorTaskId, + String subTaskId, + IntermediaryDataManager intermediaryDataManager + ) + { + this.supervisorTaskId = supervisorTaskId; + this.subTaskId = subTaskId; + this.intermediaryDataManager = intermediaryDataManager; + } + + @Override + public String getPathForHadoop(String dataSource) + { + throw new UnsupportedOperationException(); + } + + @Override + public String getPathForHadoop() + { + throw new UnsupportedOperationException(); + } + + @Override + public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException + { + final long unzippedSize = intermediaryDataManager.addSegment(supervisorTaskId, subTaskId, segment, file); + return segment.withSize(unzippedSize) + .withBinaryVersion(SegmentUtils.getVersionFromDir(file)); + } + + @Override + public Map makeLoadSpec(URI finalIndexZipFilePath) + { + throw new UnsupportedOperationException(); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java index c158c0e33a50..13c3dcd86dbf 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java @@ -116,19 +116,20 @@ public void testCleanup() throws IOException, InterruptedException { final String supervisorTaskId = "supervisorTaskId"; final Interval interval = Intervals.of("2018/2019"); - final File segmentFile = generateSegmentFile(); + final File segmentFile = generateSegmentDir("test"); final DataSegment segment = newSegment(interval, 0); intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId", segment, segmentFile); - Thread.sleep(8000); + Thread.sleep(3000); Assert.assertTrue(intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, 0).isEmpty()); } - private File generateSegmentFile() throws IOException + private File generateSegmentDir(String fileName) throws IOException { - final File segmentFile = tempDir.newFile(); - FileUtils.write(segmentFile, "test data.", StringUtils.UTF8); - return segmentFile; + // Each file size is 138 bytes after compression + final File segmentDir = tempDir.newFolder(); + FileUtils.write(new File(segmentDir, fileName), "test data.", StringUtils.UTF8); + return segmentDir; } private DataSegment newSegment(Interval interval, int partitionId) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java index 7bd9e906a96b..a0c0016dcf3d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java @@ -67,7 +67,7 @@ public void setup() throws IOException false, null, null, - ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), 150L, null)) + ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), 600L, null)) ); final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient(); intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient); @@ -83,15 +83,16 @@ public void teardown() throws InterruptedException @Test public void testAddSegmentFailure() throws IOException { - for (int i = 0; i < 15; i++) { - File segmentFile = generateSegmentFile(); + int i = 0; + for (; i < 4; i++) { + File segmentFile = generateSegmentDir("file_" + i); DataSegment segment = newSegment(Intervals.of("2018/2019"), i); intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile); } expectedException.expect(IllegalStateException.class); expectedException.expectMessage("Can't find location to handle segment"); - File segmentFile = generateSegmentFile(); - DataSegment segment = newSegment(Intervals.of("2018/2019"), 16); + File segmentFile = generateSegmentDir("file_" + i); + DataSegment segment = newSegment(Intervals.of("2018/2019"), 4); intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile); } @@ -101,15 +102,15 @@ public void testFindPartitionFiles() throws IOException final String supervisorTaskId = "supervisorTaskId"; final Interval interval = Intervals.of("2018/2019"); final int partitionId = 0; - for (int i = 0; i < 10; i++) { - final File segmentFile = generateSegmentFile(); + for (int i = 0; i < 4; i++) { + final File segmentFile = generateSegmentDir("file_" + i); final DataSegment segment = newSegment(interval, partitionId); intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId_" + i, segment, segmentFile); } final List files = intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, partitionId); - Assert.assertEquals(10, files.size()); + Assert.assertEquals(4, files.size()); files.sort(Comparator.comparing(File::getName)); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 4; i++) { Assert.assertEquals("subTaskId_" + i, files.get(i).getName()); } } @@ -119,9 +120,9 @@ public void deletePartitions() throws IOException { final String supervisorTaskId = "supervisorTaskId"; final Interval interval = Intervals.of("2018/2019"); - for (int partitionId = 0; partitionId < 5; partitionId++) { - for (int subTaskId = 0; subTaskId < 3; subTaskId++) { - final File segmentFile = generateSegmentFile(); + for (int partitionId = 0; partitionId < 2; partitionId++) { + for (int subTaskId = 0; subTaskId < 2; subTaskId++) { + final File segmentFile = generateSegmentDir("file_" + partitionId + "_" + subTaskId); final DataSegment segment = newSegment(interval, partitionId); intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId_" + subTaskId, segment, segmentFile); } @@ -129,7 +130,7 @@ public void deletePartitions() throws IOException intermediaryDataManager.deletePartitions(supervisorTaskId); - for (int partitionId = 0; partitionId < 5; partitionId++) { + for (int partitionId = 0; partitionId < 2; partitionId++) { Assert.assertTrue(intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, partitionId).isEmpty()); } } @@ -139,22 +140,24 @@ public void testAddRemoveAdd() throws IOException { final String supervisorTaskId = "supervisorTaskId"; final Interval interval = Intervals.of("2018/2019"); - for (int i = 0; i < 15; i++) { - File segmentFile = generateSegmentFile(); + int i = 0; + for (; i < 4; i++) { + File segmentFile = generateSegmentDir("file_" + i); DataSegment segment = newSegment(interval, i); intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile); } intermediaryDataManager.deletePartitions(supervisorTaskId); - File segmentFile = generateSegmentFile(); - DataSegment segment = newSegment(interval, 16); + File segmentFile = generateSegmentDir("file_" + i); + DataSegment segment = newSegment(interval, i); intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId", segment, segmentFile); } - private File generateSegmentFile() throws IOException + private File generateSegmentDir(String fileName) throws IOException { - final File segmentFile = tempDir.newFile(); - FileUtils.write(segmentFile, "test data.", StringUtils.UTF8); - return segmentFile; + // Each file size is 138 bytes after compression + final File segmentDir = tempDir.newFolder(); + FileUtils.write(new File(segmentDir, fileName), "test data.", StringUtils.UTF8); + return segmentDir; } private DataSegment newSegment(Interval interval, int partitionId) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java new file mode 100644 index 000000000000..f149fa6aee16 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.worker; + +import com.amazonaws.util.StringUtils; +import com.google.common.collect.ImmutableList; +import com.google.common.io.Files; +import com.google.common.primitives.Ints; +import org.apache.commons.io.FileUtils; +import org.apache.druid.client.indexing.IndexingServiceClient; +import org.apache.druid.client.indexing.NoopIndexingServiceClient; +import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.indexing.worker.config.WorkerConfig; +import org.apache.druid.java.util.common.FileUtils.FileCopyResult; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.segment.loading.StorageLocationConfig; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.apache.druid.utils.CompressionUtils; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +public class ShuffleDataSegmentPusherTest +{ + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private IntermediaryDataManager intermediaryDataManager; + private ShuffleDataSegmentPusher segmentPusher; + + @Before + public void setup() throws IOException + { + final WorkerConfig workerConfig = new WorkerConfig(); + final TaskConfig taskConfig = new TaskConfig( + null, + null, + null, + null, + null, + false, + null, + null, + ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null)) + ); + final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient(); + intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient); + intermediaryDataManager.start(); + segmentPusher = new ShuffleDataSegmentPusher("supervisorTaskId", "subTaskId", intermediaryDataManager); + } + + @After + public void teardown() throws InterruptedException + { + intermediaryDataManager.stop(); + } + + @Test + public void testPush() throws IOException + { + final File segmentDir = generateSegmentDir(); + final DataSegment segment = newSegment(Intervals.of("2018/2019")); + final DataSegment pushed = segmentPusher.push(segmentDir, segment, true); + + Assert.assertEquals(9, pushed.getBinaryVersion().intValue()); + Assert.assertEquals(14, pushed.getSize()); // 10 bytes data + 4 bytes version + + final List files = intermediaryDataManager.findPartitionFiles( + "supervisorTaskId", + segment.getInterval(), + segment.getShardSpec().getPartitionNum() + ); + Assert.assertEquals(1, files.size()); + final File zippedSegment = files.get(0); + final File tempDir = temporaryFolder.newFolder(); + final FileCopyResult result = CompressionUtils.unzip(zippedSegment, tempDir); + final List unzippedFiles = new ArrayList<>(result.getFiles()); + unzippedFiles.sort(Comparator.comparing(File::getName)); + final File dataFile = unzippedFiles.get(0); + Assert.assertEquals("test", dataFile.getName()); + Assert.assertEquals("test data.", Files.readFirstLine(dataFile, StringUtils.UTF8)); + final File versionFile = unzippedFiles.get(1); + Assert.assertEquals("version.bin", versionFile.getName()); + Assert.assertArrayEquals(Ints.toByteArray(0x9), Files.toByteArray(versionFile)); + } + + private File generateSegmentDir() throws IOException + { + // Each file size is 138 bytes after compression + final File segmentDir = temporaryFolder.newFolder(); + Files.asByteSink(new File(segmentDir, "version.bin")).write(Ints.toByteArray(0x9)); + FileUtils.write(new File(segmentDir, "test"), "test data.", StringUtils.UTF8); + return segmentDir; + } + + private DataSegment newSegment(Interval interval) + { + return new DataSegment( + "dataSource", + interval, + "version", + null, + null, + null, + new NumberedShardSpec(0, 0), + 9, + 0 + ); + } +} diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index b25c2163f71d..5c45bfd7e797 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -163,9 +163,7 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException if (loc == null) { loc = loadSegmentWithRetry(segment, storageDir); } - final File localStorageDir = new File(loc.getPath(), storageDir); - loc.addSegmentDir(localStorageDir, segment); - return localStorageDir; + return new File(loc.getPath(), storageDir); } finally { unlock(segment, lock); @@ -181,23 +179,24 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException { for (StorageLocation loc : locations) { - if (loc.canHandle(segment)) { - File storageDir = new File(loc.getPath(), storageDirStr); - + File storageDir = new File(loc.getPath(), storageDirStr); + if (loc.reserve(storageDir, segment)) { try { loadInLocationWithStartMarker(segment, storageDir); return loc; } catch (SegmentLoadingException e) { - log.makeAlert( - e, - "Failed to load segment in current location %s, try next location if any", - loc.getPath().getAbsolutePath() - ) - .addData("location", loc.getPath().getAbsolutePath()) - .emit(); - - cleanupCacheFiles(loc.getPath(), storageDir); + try { + log.makeAlert( + e, + "Failed to load segment in current location [%s], try next location if any", + loc.getPath().getAbsolutePath() + ).addData("location", loc.getPath().getAbsolutePath()).emit(); + } + finally { + loc.removeSegmentDir(storageDir, segment); + cleanupCacheFiles(loc.getPath(), storageDir); + } } } } @@ -366,4 +365,10 @@ public ConcurrentHashMap getSegmentLocks() { return segmentLocks; } + + @VisibleForTesting + public List getLocations() + { + return locations; + } } diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java index 4fb4c4ea0bc7..e81d04d2ca66 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java @@ -19,8 +19,8 @@ package org.apache.druid.segment.loading; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.io.FileUtils; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.timeline.DataSegment; @@ -72,65 +72,72 @@ public long getMaxSize() } /** - * Add a new file to this location. The given file argument must be a file rather than directory. + * Remove a segment file from this location. The given file argument must be a file rather than directory. */ - public synchronized void addFile(File file) + public synchronized void removeFile(File file) { - if (file.isDirectory()) { - throw new ISE("[%s] must be a file. Use a"); - } - if (files.add(file)) { - currSize += FileUtils.sizeOf(file); + if (files.remove(file)) { + currSize -= FileUtils.sizeOf(file); } } /** - * Add a new segment dir to this location. The segment size is added to currSize. + * Remove a segment dir from this location. The segment size is subtracted from currSize. */ - public synchronized void addSegmentDir(File segmentDir, DataSegment segment) + public synchronized void removeSegmentDir(File segmentDir, DataSegment segment) { - if (files.add(segmentDir)) { - currSize += segment.getSize(); + if (files.remove(segmentDir)) { + currSize -= segment.getSize(); } } /** - * Remove a segment file from this location. The given file argument must be a file rather than directory. + * Reserves space to store the given segment. The segment size is added to currSize. + * Returns true if it succeeds to add the given file. */ - public synchronized void removeFile(File file) + public synchronized boolean reserve(File segmentDir, DataSegment segment) { - if (files.remove(file)) { - currSize -= FileUtils.sizeOf(file); - } + return reserve(segmentDir, segment.getId().toString(), segment.getSize()); } /** - * Remove a segment dir from this location. The segment size is subtracted from currSize. + * Reserves space to store the given segment. Returns true if it succeeds to add the given file. */ - public synchronized void removeSegmentDir(File segmentDir, DataSegment segment) + public synchronized boolean reserve(File segmentFileToAdd, String segmentId, long segmentSize) { - if (files.remove(segmentDir)) { - currSize -= segment.getSize(); + if (files.contains(segmentFileToAdd)) { + return false; + } + if (canHandle(segmentId, segmentSize)) { + files.add(segmentFileToAdd); + currSize += segmentSize; + return true; + } else { + return false; } } - public boolean canHandle(DataSegment segment) + /** + * This method is available for only unit tests. Production code must use {@link #reserve} instead. + */ + @VisibleForTesting + boolean canHandle(String segmentId, long segmentSize) { - if (available() < segment.getSize()) { + if (available() < segmentSize) { log.warn( "Segment[%s:%,d] too large for storage[%s:%,d]. Check your druid.segmentCache.locations maxSize param", - segment.getId(), segment.getSize(), getPath(), available() + segmentId, segmentSize, getPath(), available() ); return false; } if (freeSpaceToKeep > 0) { long currFreeSpace = path.getFreeSpace(); - if ((freeSpaceToKeep + segment.getSize()) > currFreeSpace) { + if ((freeSpaceToKeep + segmentSize) > currFreeSpace) { log.warn( "Segment[%s:%,d] too large for storage[%s:%,d] to maintain suggested freeSpace[%d], current freeSpace is [%d].", - segment.getId(), - segment.getSize(), + segmentId, + segmentSize, getPath(), available(), freeSpaceToKeep, diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java new file mode 100644 index 000000000000..69ce71792bbc --- /dev/null +++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.loading; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.hamcrest.CoreMatchers; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.stream.Collectors; + +public class SegmentLoaderLocalCacheManagerConcurrencyTest +{ + @Rule + public final TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + private final ObjectMapper jsonMapper; + private final String dataSource = "test_ds"; + private final String segmentVersion; + + private File localSegmentCacheFolder; + private SegmentLoaderLocalCacheManager manager; + private ExecutorService executorService; + + public SegmentLoaderLocalCacheManagerConcurrencyTest() + { + jsonMapper = new DefaultObjectMapper(); + jsonMapper.registerSubtypes(new NamedType(LocalLoadSpec.class, "local")); + jsonMapper.setInjectableValues( + new InjectableValues.Std().addValue( + LocalDataSegmentPuller.class, + new LocalDataSegmentPuller() + ) + ); + segmentVersion = DateTimes.nowUtc().toString(); + } + + @Before + public void setUp() throws Exception + { + EmittingLogger.registerEmitter(new NoopServiceEmitter()); + localSegmentCacheFolder = tmpFolder.newFolder("segment_cache_folder"); + + final List locations = new ArrayList<>(); + // Each segment has the size of 1000 bytes. This deep storage is capable of storing up to 2 segments. + final StorageLocationConfig locationConfig = new StorageLocationConfig(localSegmentCacheFolder, 2000L, null); + locations.add(locationConfig); + + manager = new SegmentLoaderLocalCacheManager( + TestHelper.getTestIndexIO(), + new SegmentLoaderConfig().withLocations(locations), + jsonMapper + ); + executorService = Execs.multiThreaded(4, "segment-loader-local-cache-manager-concurrency-test-%d"); + } + + @After + public void tearDown() + { + executorService.shutdownNow(); + } + + @Test + public void testGetSegment() throws IOException, ExecutionException, InterruptedException + { + final File localStorageFolder = tmpFolder.newFolder("local_storage_folder"); + final List segmentsToLoad = new ArrayList<>(4); + + final Interval interval = Intervals.of("2019-01-01/P1D"); + for (int partitionId = 0; partitionId < 4; partitionId++) { + final String segmentPath = Paths.get( + localStorageFolder.getCanonicalPath(), + dataSource, + StringUtils.format("%s_%s", interval.getStart().toString(), interval.getEnd().toString()), + segmentVersion, + String.valueOf(partitionId) + ).toString(); + // manually create a local segment under localStorageFolder + final File localSegmentFile = new File( + localStorageFolder, + segmentPath + ); + localSegmentFile.mkdirs(); + final File indexZip = new File(localSegmentFile, "index.zip"); + indexZip.createNewFile(); + + final DataSegment segment = newSegment(interval, partitionId).withLoadSpec( + ImmutableMap.of( + "type", + "local", + "path", + localSegmentFile.getAbsolutePath() + ) + ); + segmentsToLoad.add(segment); + } + + final List futures = segmentsToLoad + .stream() + .map(segment -> executorService.submit(() -> manager.getSegmentFiles(segment))) + .collect(Collectors.toList()); + + expectedException.expect(ExecutionException.class); + expectedException.expectCause(CoreMatchers.instanceOf(SegmentLoadingException.class)); + expectedException.expectMessage("Failed to load segment"); + for (Future future : futures) { + future.get(); + } + + System.out.println(manager.getLocations().get(0).available()); + } + + private DataSegment newSegment(Interval interval, int partitionId) + { + return DataSegment.builder() + .dataSource(dataSource) + .interval(interval) + .loadSpec( + ImmutableMap.of( + "type", + "local", + "path", + "somewhere" + ) + ) + .version(segmentVersion) + .dimensions(ImmutableList.of()) + .metrics(ImmutableList.of()) + .shardSpec(new NumberedShardSpec(partitionId, 0)) + .binaryVersion(9) + .size(1000L) + .build(); + } +} diff --git a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java index cdfcd47ce8dc..6119fd66ca45 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; @@ -38,18 +39,18 @@ public void testStorageLocationFreePercent() { // free space ignored only maxSize matters StorageLocation locationPlain = fakeLocation(100_000, 5_000, 10_000, null); - Assert.assertTrue(locationPlain.canHandle(makeSegment("2012/2013", 9_000))); - Assert.assertFalse(locationPlain.canHandle(makeSegment("2012/2013", 11_000))); + Assert.assertTrue(locationPlain.canHandle(newSegmentId("2012/2013").toString(), 9_000)); + Assert.assertFalse(locationPlain.canHandle(newSegmentId("2012/2013").toString(), 11_000)); // enough space available maxSize is the limit StorageLocation locationFree = fakeLocation(100_000, 25_000, 10_000, 10.0); - Assert.assertTrue(locationFree.canHandle(makeSegment("2012/2013", 9_000))); - Assert.assertFalse(locationFree.canHandle(makeSegment("2012/2013", 11_000))); + Assert.assertTrue(locationFree.canHandle(newSegmentId("2012/2013").toString(), 9_000)); + Assert.assertFalse(locationFree.canHandle(newSegmentId("2012/2013").toString(), 11_000)); // disk almost full percentage is the limit StorageLocation locationFull = fakeLocation(100_000, 15_000, 10_000, 10.0); - Assert.assertTrue(locationFull.canHandle(makeSegment("2012/2013", 4_000))); - Assert.assertFalse(locationFull.canHandle(makeSegment("2012/2013", 6_000))); + Assert.assertTrue(locationFull.canHandle(newSegmentId("2012/2013").toString(), 4_000)); + Assert.assertFalse(locationFull.canHandle(newSegmentId("2012/2013").toString(), 6_000)); } private StorageLocation fakeLocation(long total, long free, long max, Double percent) @@ -71,14 +72,14 @@ public void testStorageLocation() final DataSegment secondSegment = makeSegment("2012-01-02/2012-01-03", 23); - loc.addSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); + loc.reserve(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); expectedAvail -= 10; verifyLoc(expectedAvail, loc); - loc.addSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); + loc.reserve(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); verifyLoc(expectedAvail, loc); - loc.addSegmentDir(new File("test2"), secondSegment); + loc.reserve(new File("test2"), secondSegment); expectedAvail -= 23; verifyLoc(expectedAvail, loc); @@ -98,7 +99,7 @@ private void verifyLoc(long maxSize, StorageLocation loc) { Assert.assertEquals(maxSize, loc.available()); for (int i = 0; i <= maxSize; ++i) { - Assert.assertTrue(String.valueOf(i), loc.canHandle(makeSegment("2013/2014", i))); + Assert.assertTrue(String.valueOf(i), loc.canHandle(newSegmentId("2013/2014").toString(), i)); } } @@ -116,4 +117,9 @@ private DataSegment makeSegment(String intervalString, long size) size ); } + + private SegmentId newSegmentId(String intervalString) + { + return SegmentId.of("test", Intervals.of(intervalString), "1", 0); + } } From db31b66ccdb87aa160a51549a2aa46f5cbef0f86 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 19 Jul 2019 16:40:46 -0700 Subject: [PATCH 04/13] add comments --- .../druid/indexing/worker/IntermediaryDataManager.java | 9 ++++----- .../druid/indexing/worker/ShuffleDataSegmentPusher.java | 4 ++++ 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java index e207a5a27605..a8b45b19bc50 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java @@ -66,7 +66,7 @@ /** * This class manages intermediary segments for data shuffle between native parallel index tasks. - * In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers + * In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers (or indexer) * and phase 2 tasks read those files via HTTP. *

* The directory where segment files are placed is structured as @@ -182,7 +182,9 @@ private void discoverSupervisorTaskPartitions() supervisorTaskId, k -> { for (File eachFile : FileUtils.listFiles(supervisorTaskDir, null, true)) { - location.reserve(eachFile, eachFile.getName(), eachFile.length()); + if (!location.reserve(eachFile, eachFile.getName(), eachFile.length())) { + log.warn("Can't add a discovered partition[%s]", eachFile.getAbsolutePath()); + } } numDiscovered.increment(); return DateTimes.nowUtc().plus(intermediaryPartitionTimeout); @@ -237,9 +239,6 @@ private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws Interrup /** * Write a segment into one of configured locations. The location to write is chosen in a round-robin manner per * supervisorTaskId. - *

- * This method is only useful for the new Indexer model. Tasks running in the existing middleManager should the static - * addSegment method. */ public long addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentDir) throws IOException diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java index 39947f413d75..c28a34e654f8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java @@ -28,6 +28,10 @@ import java.net.URI; import java.util.Map; +/** + * DataSegmentPusher used for storing intermeidary data in local storage during data shuffle of native parallel + * indexing. + */ public class ShuffleDataSegmentPusher implements DataSegmentPusher { private final String supervisorTaskId; From 999d3e5322f0a866248f462547c503b106be923b Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 19 Jul 2019 17:24:58 -0700 Subject: [PATCH 05/13] unused import --- .../druid/indexing/worker/ShuffleDataSegmentPusherTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java index f149fa6aee16..8a98b4103de8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java @@ -45,7 +45,6 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Comparator; import java.util.List; From d7903f54f94f5bc236cea27891ab935496b2e34a Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 20 Jul 2019 22:02:25 -0700 Subject: [PATCH 06/13] add comments --- .../worker/IntermediaryDataManager.java | 35 +++++++++++----- .../SegmentLoaderLocalCacheManager.java | 4 +- .../segment/loading/StorageLocation.java | 42 ++++++++++++++----- .../segment/loading/StorageLocationTest.java | 28 ++++++------- 4 files changed, 72 insertions(+), 37 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java index 8ac988bb3820..a66fda0a734a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java @@ -45,6 +45,7 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -242,7 +243,7 @@ public void addSegment(String supervisorTaskId, String subTaskId, DataSegment se public List findPartitionFiles(String supervisorTaskId, Interval interval, int partitionId) { for (StorageLocation location : shuffleDataLocations) { - final File partitionDir = getPartitionDir(location, supervisorTaskId, interval, partitionId); + final File partitionDir = new File(location.getPath(), getPartitionDir(supervisorTaskId, interval, partitionId)); if (partitionDir.exists()) { supervisorTaskCheckTimes.put(supervisorTaskId, DateTimes.nowUtc()); final File[] segmentFiles = partitionDir.listFiles(); @@ -282,11 +283,17 @@ public static void addSegment( { for (int i = 0; i < numLocations; i++) { final StorageLocation location = cyclicIterator.next(); - final File destFile = new File( - getPartitionDir(location, supervisorTaskId, segment.getInterval(), segment.getShardSpec().getPartitionNum()), - subTaskId + final File destFile = location.reserve( + getPartitionFilePath( + supervisorTaskId, + subTaskId, + segment.getInterval(), + segment.getShardSpec().getPartitionNum() + ), + segment.getId(), + segmentFile.length() ); - if (location.reserve(destFile, segment.getId().toString(), segmentFile.length())) { + if (destFile != null) { try { FileUtils.forceMkdirParent(destFile); final long copiedBytes = Files.asByteSource(segmentFile).copyTo(Files.asByteSink(destFile)); @@ -310,19 +317,27 @@ public static void addSegment( throw new ISE("Can't find location to handle segment[%s]", segment); } - private static File getPartitionDir( - StorageLocation location, + private static String getPartitionFilePath( + String supervisorTaskId, + String subTaskId, + Interval interval, + int partitionId + ) + { + return Paths.get(getPartitionDir(supervisorTaskId, interval, partitionId), subTaskId).toString(); + } + + private static String getPartitionDir( String supervisorTaskId, Interval interval, int partitionId ) { - return FileUtils.getFile( - location.getPath(), + return Paths.get( supervisorTaskId, interval.getStart().toString(), interval.getEnd().toString(), String.valueOf(partitionId) - ); + ).toString(); } } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 5c45bfd7e797..fa2c6065f0d2 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -179,8 +179,8 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException { for (StorageLocation loc : locations) { - File storageDir = new File(loc.getPath(), storageDirStr); - if (loc.reserve(storageDir, segment)) { + File storageDir = loc.reserve(storageDirStr, segment); + if (storageDir != null) { try { loadInLocationWithStartMarker(segment, storageDir); return loc; diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java index e81d04d2ca66..f6733929c2c6 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java @@ -23,6 +23,7 @@ import org.apache.commons.io.FileUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import javax.annotation.Nullable; import java.io.File; @@ -30,17 +31,27 @@ import java.util.Set; /** + * This class is a very simple logical representation of a local path. It keeps track of files stored under the + * {@link #path} via {@link #reserve}, so that the total size of stored files doesn't exceed the {@link #maxSize} and + * available space is always kept smaller than {@link #freeSpaceToKeep}. + * + * This class is thread-safe, so that multiple threads can update its state at the same time. + * One example usage is that a historical can use multiple threads to load different segments in parallel + * from deep storage. */ public class StorageLocation { private static final Logger log = new Logger(StorageLocation.class); private final File path; - private final long maxSize; + private final long maxSize; // in bytes private final long freeSpaceToKeep; + + // Set of files stored under the given path. All accesses must be synchronized with currSize. private final Set files = new HashSet<>(); - private volatile long currSize = 0; + // Current total size of files in bytes. All accesses must be synchronized with files. + private long currSize = 0; public StorageLocation(File path, long maxSize, @Nullable Double freeSpacePercent) { @@ -78,6 +89,8 @@ public synchronized void removeFile(File file) { if (files.remove(file)) { currSize -= FileUtils.sizeOf(file); + } else { + log.warn("File[%s] is not found under this location[%s]", file, path); } } @@ -88,32 +101,39 @@ public synchronized void removeSegmentDir(File segmentDir, DataSegment segment) { if (files.remove(segmentDir)) { currSize -= segment.getSize(); + } else { + log.warn("SegmentDir[%s] is not found under this location[%s]", segmentDir, path); } } /** * Reserves space to store the given segment. The segment size is added to currSize. - * Returns true if it succeeds to add the given file. + * If it succeeds, it returns a file for the given segmentDir in this storage location. Returns null otherwise. */ - public synchronized boolean reserve(File segmentDir, DataSegment segment) + @Nullable + public synchronized File reserve(String segmentDir, DataSegment segment) { - return reserve(segmentDir, segment.getId().toString(), segment.getSize()); + return reserve(segmentDir, segment.getId(), segment.getSize()); } /** - * Reserves space to store the given segment. Returns true if it succeeds to add the given file. + * Reserves space to store the given segment. + * If it succeeds, it returns a file for the given segmentFilePathToAdd in this storage location. + * Returns null otherwise. */ - public synchronized boolean reserve(File segmentFileToAdd, String segmentId, long segmentSize) + @Nullable + public synchronized File reserve(String segmentFilePathToAdd, SegmentId segmentId, long segmentSize) { + final File segmentFileToAdd = new File(path, segmentFilePathToAdd); if (files.contains(segmentFileToAdd)) { - return false; + return null; } if (canHandle(segmentId, segmentSize)) { files.add(segmentFileToAdd); currSize += segmentSize; - return true; + return segmentFileToAdd; } else { - return false; + return null; } } @@ -121,7 +141,7 @@ public synchronized boolean reserve(File segmentFileToAdd, String segmentId, lon * This method is available for only unit tests. Production code must use {@link #reserve} instead. */ @VisibleForTesting - boolean canHandle(String segmentId, long segmentSize) + boolean canHandle(SegmentId segmentId, long segmentSize) { if (available() < segmentSize) { log.warn( diff --git a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java index 6119fd66ca45..7321e08bab6a 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java @@ -39,18 +39,18 @@ public void testStorageLocationFreePercent() { // free space ignored only maxSize matters StorageLocation locationPlain = fakeLocation(100_000, 5_000, 10_000, null); - Assert.assertTrue(locationPlain.canHandle(newSegmentId("2012/2013").toString(), 9_000)); - Assert.assertFalse(locationPlain.canHandle(newSegmentId("2012/2013").toString(), 11_000)); + Assert.assertTrue(locationPlain.canHandle(newSegmentId("2012/2013"), 9_000)); + Assert.assertFalse(locationPlain.canHandle(newSegmentId("2012/2013"), 11_000)); // enough space available maxSize is the limit StorageLocation locationFree = fakeLocation(100_000, 25_000, 10_000, 10.0); - Assert.assertTrue(locationFree.canHandle(newSegmentId("2012/2013").toString(), 9_000)); - Assert.assertFalse(locationFree.canHandle(newSegmentId("2012/2013").toString(), 11_000)); + Assert.assertTrue(locationFree.canHandle(newSegmentId("2012/2013"), 9_000)); + Assert.assertFalse(locationFree.canHandle(newSegmentId("2012/2013"), 11_000)); // disk almost full percentage is the limit StorageLocation locationFull = fakeLocation(100_000, 15_000, 10_000, 10.0); - Assert.assertTrue(locationFull.canHandle(newSegmentId("2012/2013").toString(), 4_000)); - Assert.assertFalse(locationFull.canHandle(newSegmentId("2012/2013").toString(), 6_000)); + Assert.assertTrue(locationFull.canHandle(newSegmentId("2012/2013"), 4_000)); + Assert.assertFalse(locationFull.canHandle(newSegmentId("2012/2013"), 6_000)); } private StorageLocation fakeLocation(long total, long free, long max, Double percent) @@ -66,31 +66,31 @@ private StorageLocation fakeLocation(long total, long free, long max, Double per public void testStorageLocation() { long expectedAvail = 1000L; - StorageLocation loc = new StorageLocation(new File("/tmp"), expectedAvail, null); + StorageLocation loc = new StorageLocation(new File(""), expectedAvail, null); verifyLoc(expectedAvail, loc); final DataSegment secondSegment = makeSegment("2012-01-02/2012-01-03", 23); - loc.reserve(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); + loc.reserve("test1", makeSegment("2012-01-01/2012-01-02", 10)); expectedAvail -= 10; verifyLoc(expectedAvail, loc); - loc.reserve(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); + loc.reserve("test1", makeSegment("2012-01-01/2012-01-02", 10)); verifyLoc(expectedAvail, loc); - loc.reserve(new File("test2"), secondSegment); + loc.reserve("test2", secondSegment); expectedAvail -= 23; verifyLoc(expectedAvail, loc); - loc.removeSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); + loc.removeSegmentDir(new File("/tmp/test1"), makeSegment("2012-01-01/2012-01-02", 10)); expectedAvail += 10; verifyLoc(expectedAvail, loc); - loc.removeSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10)); + loc.removeSegmentDir(new File("/tmp/test1"), makeSegment("2012-01-01/2012-01-02", 10)); verifyLoc(expectedAvail, loc); - loc.removeSegmentDir(new File("test2"), secondSegment); + loc.removeSegmentDir(new File("/tmp/test2"), secondSegment); expectedAvail += 23; verifyLoc(expectedAvail, loc); } @@ -99,7 +99,7 @@ private void verifyLoc(long maxSize, StorageLocation loc) { Assert.assertEquals(maxSize, loc.available()); for (int i = 0; i <= maxSize; ++i) { - Assert.assertTrue(String.valueOf(i), loc.canHandle(newSegmentId("2013/2014").toString(), i)); + Assert.assertTrue(String.valueOf(i), loc.canHandle(newSegmentId("2013/2014"), i)); } } From 5a1acaaeccefa9e686da9d078d71f66f3dfe5da4 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sun, 21 Jul 2019 09:19:31 -0700 Subject: [PATCH 07/13] fix test --- .../org/apache/druid/segment/loading/StorageLocationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java index 7321e08bab6a..4c4ad813384a 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java @@ -66,7 +66,7 @@ private StorageLocation fakeLocation(long total, long free, long max, Double per public void testStorageLocation() { long expectedAvail = 1000L; - StorageLocation loc = new StorageLocation(new File(""), expectedAvail, null); + StorageLocation loc = new StorageLocation(new File("/tmp"), expectedAvail, null); verifyLoc(expectedAvail, loc); From a8e894f268fb2c51c4fe5b6b1f7988b31ad4eb97 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 23 Jul 2019 16:24:58 -0700 Subject: [PATCH 08/13] address comments --- .../worker/IntermediaryDataManager.java | 2 +- .../SegmentLoaderLocalCacheManager.java | 2 +- .../segment/loading/StorageLocation.java | 52 ++++++++++--------- ...oaderLocalCacheManagerConcurrencyTest.java | 2 +- .../segment/loading/StorageLocationTest.java | 2 +- 5 files changed, 32 insertions(+), 28 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java index a66fda0a734a..d6001ce82d3c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java @@ -308,7 +308,7 @@ public static void addSegment( } } catch (Exception e) { - // Print only log here to try other locations as well. + // Only log here to try other locations as well. log.warn(e, "Failed to write segmentFile at [%s]", destFile); location.removeFile(segmentFile); } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index fa2c6065f0d2..d0670163f5fc 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -44,7 +44,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader { private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class); private static final Comparator COMPARATOR = (left, right) -> - Longs.compare(right.available(), left.available()); + Longs.compare(right.availableSizeBytes(), left.availableSizeBytes()); private final IndexIO indexIO; private final SegmentLoaderConfig config; diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java index f6733929c2c6..916e1257d45d 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java @@ -26,14 +26,15 @@ import org.apache.druid.timeline.SegmentId; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import java.io.File; import java.util.HashSet; import java.util.Set; /** * This class is a very simple logical representation of a local path. It keeps track of files stored under the - * {@link #path} via {@link #reserve}, so that the total size of stored files doesn't exceed the {@link #maxSize} and - * available space is always kept smaller than {@link #freeSpaceToKeep}. + * {@link #path} via {@link #reserve}, so that the total size of stored files doesn't exceed the {@link #maxSizeBytes} + * and available space is always kept smaller than {@link #freeSpaceToKeep}. * * This class is thread-safe, so that multiple threads can update its state at the same time. * One example usage is that a historical can use multiple threads to load different segments in parallel @@ -44,19 +45,25 @@ public class StorageLocation private static final Logger log = new Logger(StorageLocation.class); private final File path; - private final long maxSize; // in bytes + private final long maxSizeBytes; private final long freeSpaceToKeep; - // Set of files stored under the given path. All accesses must be synchronized with currSize. + /** + * Set of files stored under the given path. All accesses must be synchronized with currSizeBytes. + */ + @GuardedBy("this") private final Set files = new HashSet<>(); - // Current total size of files in bytes. All accesses must be synchronized with files. - private long currSize = 0; + /** + * Current total size of files in bytes. All accesses must be synchronized with files. + */ + @GuardedBy("this") + private long currSizeBytes = 0; - public StorageLocation(File path, long maxSize, @Nullable Double freeSpacePercent) + public StorageLocation(File path, long maxSizeBytes, @Nullable Double freeSpacePercent) { this.path = path; - this.maxSize = maxSize; + this.maxSizeBytes = maxSizeBytes; if (freeSpacePercent != null) { long totalSpaceInPartition = path.getTotalSpace(); @@ -77,37 +84,32 @@ public File getPath() return path; } - public long getMaxSize() - { - return maxSize; - } - /** * Remove a segment file from this location. The given file argument must be a file rather than directory. */ public synchronized void removeFile(File file) { if (files.remove(file)) { - currSize -= FileUtils.sizeOf(file); + currSizeBytes -= FileUtils.sizeOf(file); } else { log.warn("File[%s] is not found under this location[%s]", file, path); } } /** - * Remove a segment dir from this location. The segment size is subtracted from currSize. + * Remove a segment dir from this location. The segment size is subtracted from currSizeBytes. */ public synchronized void removeSegmentDir(File segmentDir, DataSegment segment) { if (files.remove(segmentDir)) { - currSize -= segment.getSize(); + currSizeBytes -= segment.getSize(); } else { log.warn("SegmentDir[%s] is not found under this location[%s]", segmentDir, path); } } /** - * Reserves space to store the given segment. The segment size is added to currSize. + * Reserves space to store the given segment. The segment size is added to currSizeBytes. * If it succeeds, it returns a file for the given segmentDir in this storage location. Returns null otherwise. */ @Nullable @@ -130,7 +132,7 @@ public synchronized File reserve(String segmentFilePathToAdd, SegmentId segmentI } if (canHandle(segmentId, segmentSize)) { files.add(segmentFileToAdd); - currSize += segmentSize; + currSizeBytes += segmentSize; return segmentFileToAdd; } else { return null; @@ -138,15 +140,17 @@ public synchronized File reserve(String segmentFilePathToAdd, SegmentId segmentI } /** - * This method is available for only unit tests. Production code must use {@link #reserve} instead. + * This method is only package-private to use it in unit tests. Production code must not call this method directly. + * Use {@link #reserve} instead. */ @VisibleForTesting + @GuardedBy("this") boolean canHandle(SegmentId segmentId, long segmentSize) { - if (available() < segmentSize) { + if (availableSizeBytes() < segmentSize) { log.warn( "Segment[%s:%,d] too large for storage[%s:%,d]. Check your druid.segmentCache.locations maxSize param", - segmentId, segmentSize, getPath(), available() + segmentId, segmentSize, getPath(), availableSizeBytes() ); return false; } @@ -159,7 +163,7 @@ segmentId, segmentSize, getPath(), available() segmentId, segmentSize, getPath(), - available(), + availableSizeBytes(), freeSpaceToKeep, currFreeSpace ); @@ -170,8 +174,8 @@ segmentId, segmentSize, getPath(), available() return true; } - public synchronized long available() + public synchronized long availableSizeBytes() { - return maxSize - currSize; + return maxSizeBytes - currSizeBytes; } } diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java index 69ce71792bbc..cb0821717668 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java @@ -154,7 +154,7 @@ public void testGetSegment() throws IOException, ExecutionException, Interrupted future.get(); } - System.out.println(manager.getLocations().get(0).available()); + System.out.println(manager.getLocations().get(0).availableSizeBytes()); } private DataSegment newSegment(Interval interval, int partitionId) diff --git a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java index 4c4ad813384a..23da42298783 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java @@ -97,7 +97,7 @@ public void testStorageLocation() private void verifyLoc(long maxSize, StorageLocation loc) { - Assert.assertEquals(maxSize, loc.available()); + Assert.assertEquals(maxSize, loc.availableSizeBytes()); for (int i = 0; i <= maxSize; ++i) { Assert.assertTrue(String.valueOf(i), loc.canHandle(newSegmentId("2013/2014"), i)); } From 9ca05b6da5bab49e795ca7f2c97be6d4a0e2268c Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 23 Jul 2019 16:50:55 -0700 Subject: [PATCH 09/13] remove

tag from javadoc --- .../druid/indexing/worker/IntermediaryDataManager.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java index d6001ce82d3c..59e33bebda80 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java @@ -64,10 +64,10 @@ * This class manages intermediary segments for data shuffle between native parallel index tasks. * In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers * and phase 2 tasks read those files via HTTP. - *

+ * * The directory where segment files are placed is structured as * {@link StorageLocation#path}/supervisorTaskId/startTimeOfSegment/endTimeOfSegment/partitionIdOfSegment. - *

+ * * This class provides interfaces to store, find, and remove segment files. * It also has a self-cleanup mechanism to clean up stale segment files. It periodically checks the last access time * per supervisorTask and removes its all segment files if the supervisorTask is not running anymore. @@ -187,7 +187,7 @@ private void discoverSupervisorTaskPartitions() /** * Check supervisorTask status if its partitions have not been accessed in timeout and * delete all partitions for the supervisorTask if it is already finished. - *

+ * * Note that the overlord sends a cleanup request when a supervisorTask is finished. The below check is to trigger * the self-cleanup for when the cleanup request is missing. */ @@ -227,7 +227,7 @@ private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws Interrup /** * Write a segment into one of configured locations. The location to write is chosen in a round-robin manner per * supervisorTaskId. - *

+ * * This method is only useful for the new Indexer model. Tasks running in the existing middleManager should the static * addSegment method. */ From f67468268c0ea9bcc331b3b5f6f2ac985ded1783 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 26 Jul 2019 10:44:47 -0700 Subject: [PATCH 10/13] address comments --- .../druid/indexing/worker/IntermediaryDataManager.java | 2 +- .../segment/loading/SegmentLoaderLocalCacheManager.java | 6 +++--- .../org/apache/druid/segment/loading/StorageLocation.java | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java index 59e33bebda80..c47425a4ee87 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java @@ -307,7 +307,7 @@ public static void addSegment( return; } } - catch (Exception e) { + catch (IOException e) { // Only log here to try other locations as well. log.warn(e, "Failed to write segmentFile at [%s]", destFile); location.removeFile(segmentFile); diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index d0670163f5fc..4b74257ab794 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import com.google.common.primitives.Longs; import com.google.inject.Inject; import org.apache.commons.io.FileUtils; import org.apache.druid.guice.annotations.Json; @@ -43,8 +42,9 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader { private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class); - private static final Comparator COMPARATOR = (left, right) -> - Longs.compare(right.availableSizeBytes(), left.availableSizeBytes()); + private static final Comparator COMPARATOR = Comparator + .comparing(StorageLocation::availableSizeBytes) + .reversed(); private final IndexIO indexIO; private final SegmentLoaderConfig config; diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java index 916e1257d45d..842295ebb9fb 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java @@ -49,13 +49,13 @@ public class StorageLocation private final long freeSpaceToKeep; /** - * Set of files stored under the given path. All accesses must be synchronized with currSizeBytes. + * Set of files stored under the {@link #path}. */ @GuardedBy("this") private final Set files = new HashSet<>(); /** - * Current total size of files in bytes. All accesses must be synchronized with files. + * Current total size of files in bytes. */ @GuardedBy("this") private long currSizeBytes = 0; From 65323133cdf95f8ba1c41091a852296e1089c31a Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 26 Jul 2019 10:50:32 -0700 Subject: [PATCH 11/13] comparingLong --- .../druid/segment/loading/SegmentLoaderLocalCacheManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 4b74257ab794..e0f0f0b4ccdb 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -43,7 +43,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader { private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class); private static final Comparator COMPARATOR = Comparator - .comparing(StorageLocation::availableSizeBytes) + .comparingLong(StorageLocation::availableSizeBytes) .reversed(); private final IndexIO indexIO; From 932d9aa73b21c34410edc8eec410e87f9a36105f Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 2 Aug 2019 14:54:00 -0700 Subject: [PATCH 12/13] Address comments --- .../worker/IntermediaryDataManager.java | 105 +++++++++--------- 1 file changed, 52 insertions(+), 53 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java index ae4263fcf5e9..0d0ca11f8b91 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java @@ -32,7 +32,6 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.StreamUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; @@ -59,7 +58,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; @@ -82,8 +80,7 @@ @ManageLifecycle public class IntermediaryDataManager { - private static final Logger log = new Logger(IntermediaryDataManager.class); - private static final int DEFAULT_FILE_COPY_RETRY_COUNT = 3; + private static final Logger LOG = new Logger(IntermediaryDataManager.class); private final long intermediaryPartitionDiscoveryPeriodSec; private final long intermediaryPartitionCleanupPeriodSec; @@ -138,7 +135,7 @@ public void start() discoverSupervisorTaskPartitions(); } catch (Exception e) { - log.warn(e, "Error while discovering supervisorTasks"); + LOG.warn(e, "Error while discovering supervisorTasks"); } }, intermediaryPartitionDiscoveryPeriodSec, @@ -152,10 +149,10 @@ public void start() deleteExpiredSuprevisorTaskPartitionsIfNotRunning(); } catch (InterruptedException e) { - log.error(e, "Error while cleaning up partitions for expired supervisors"); + LOG.error(e, "Error while cleaning up partitions for expired supervisors"); } catch (Exception e) { - log.warn(e, "Error while cleaning up partitions for expired supervisors"); + LOG.warn(e, "Error while cleaning up partitions for expired supervisors"); } }, intermediaryPartitionCleanupPeriodSec, @@ -201,7 +198,7 @@ private void discoverSupervisorTaskPartitions() eachFile.length() ); if (reservedFile == null) { - log.warn("Can't add a discovered partition[%s]", eachFile.getAbsolutePath()); + LOG.warn("Can't add a discovered partition[%s]", eachFile.getAbsolutePath()); } } numDiscovered.increment(); @@ -210,7 +207,7 @@ private void discoverSupervisorTaskPartitions() ); } } - log.info( + LOG.info( "Discovered partitions for [%s] new supervisor tasks under location[%s]", numDiscovered.getValue(), location.getPath() @@ -237,7 +234,7 @@ private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws Interrup } } - log.info("Found [%s] expired supervisor tasks", expiredSupervisorTasks.size()); + LOG.info("Found [%s] expired supervisor tasks", expiredSupervisorTasks.size()); final Map taskStatuses = indexingServiceClient.getTaskStatuses(expiredSupervisorTasks); for (Entry entry : taskStatuses.entrySet()) { @@ -249,7 +246,7 @@ private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws Interrup deletePartitions(supervisorTaskId); } catch (IOException e) { - log.warn(e, "Failed to delete partitions for task[%s]", supervisorTaskId); + LOG.warn(e, "Failed to delete partitions for task[%s]", supervisorTaskId); } } else { // If it's still running, update last access time. @@ -279,52 +276,54 @@ long addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, // Create a zipped segment in a temp directory. final File taskTempDir = taskConfig.getTaskTempDir(subTaskId); - final Closer resourceCloser = Closer.create(); - if (taskTempDir.mkdirs()) { - resourceCloser.register(() -> FileUtils.(taskTempDir)); - } - final File tempZippedFile = new File(taskTempDir, segment.getId().toString()); - final long unzippedSizeBytes = CompressionUtils.zip(segmentDir, tempZippedFile); - if (unzippedSizeBytes == 0) { - throw new IOE( - "Read 0 bytes from segmentDir[%s]", - segmentDir.getAbsolutePath() - ); - } - resourceCloser.register(() -> ) - // Try copying the zipped segment to one of storage locations - for (int i = 0; i < shuffleDataLocations.size(); i++) { - final StorageLocation location = iterator.next(); - final String partitionFilePath = getPartitionFilePath( - supervisorTaskId, - subTaskId, - segment.getInterval(), - segment.getShardSpec().getPartitionNum() - ); - final File destFile = location.reserve(partitionFilePath, segment.getId().toString(), tempZippedFile.length()); - if (destFile != null) { - try { - FileUtils.forceMkdirParent(destFile); - StreamUtils.retryCopy( - Files.asByteSource(tempZippedFile), - Files.asByteSink(destFile), - t -> !(t instanceof InterruptedException) && !(t instanceof CancellationException) && (t instanceof Exception), - DEFAULT_FILE_COPY_RETRY_COUNT - ); - if (!tempZippedFile.delete()) { - log.warn("Couldn't delete file[%s]", tempZippedFile.getAbsolutePath()); + try (final Closer resourceCloser = Closer.create()) { + if (taskTempDir.mkdirs()) { + resourceCloser.register(() -> { + try { + FileUtils.forceDelete(taskTempDir); } - return unzippedSizeBytes; - } - catch (IOException e) { - // Only log here to try other locations as well. - log.warn(e, "Failed to write segmentFile at [%s]", destFile); - location.removeFile(tempZippedFile); + catch (IOException e) { + LOG.warn(e, "Failed to delete directory[%s]", taskTempDir.getAbsolutePath()); + } + }); + } + + // Tempary compressed file. Will be removed when taskTempDir is deleted. + final File tempZippedFile = new File(taskTempDir, segment.getId().toString()); + final long unzippedSizeBytes = CompressionUtils.zip(segmentDir, tempZippedFile); + if (unzippedSizeBytes == 0) { + throw new IOE( + "Read 0 bytes from segmentDir[%s]", + segmentDir.getAbsolutePath() + ); + } + + // Try copying the zipped segment to one of storage locations + for (int i = 0; i < shuffleDataLocations.size(); i++) { + final StorageLocation location = iterator.next(); + final String partitionFilePath = getPartitionFilePath( + supervisorTaskId, + subTaskId, + segment.getInterval(), + segment.getShardSpec().getPartitionNum() + ); + final File destFile = location.reserve(partitionFilePath, segment.getId().toString(), tempZippedFile.length()); + if (destFile != null) { + org.apache.druid.java.util.common.FileUtils.writeAtomically( + destFile, + out -> Files.asByteSource(tempZippedFile).copyTo(out) + ); + LOG.info( + "Wrote intermediary segment for segment[%s] of subtask[%s] at [%s]", + segment.getId(), + subTaskId, + destFile + ); } } + throw new ISE("Can't find location to handle segment[%s]", segment); } - throw new ISE("Can't find location to handle segment[%s]", segment); } public List findPartitionFiles(String supervisorTaskId, Interval interval, int partitionId) @@ -346,7 +345,7 @@ public void deletePartitions(String supervisorTaskId) throws IOException for (StorageLocation location : shuffleDataLocations) { final File supervisorTaskPath = new File(location.getPath(), supervisorTaskId); if (supervisorTaskPath.exists()) { - log.info("Cleaning up [%s]", supervisorTaskPath); + LOG.info("Cleaning up [%s]", supervisorTaskPath); for (File eachFile : FileUtils.listFiles(supervisorTaskPath, null, true)) { location.removeFile(eachFile); } From 736781a77154dd3146895df58f26acb67a6f55a2 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 2 Aug 2019 16:13:22 -0700 Subject: [PATCH 13/13] fix test --- .../apache/druid/indexing/worker/IntermediaryDataManager.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java index 0d0ca11f8b91..95acb22536ff 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java @@ -310,6 +310,7 @@ long addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, ); final File destFile = location.reserve(partitionFilePath, segment.getId().toString(), tempZippedFile.length()); if (destFile != null) { + FileUtils.forceMkdirParent(destFile); org.apache.druid.java.util.common.FileUtils.writeAtomically( destFile, out -> Files.asByteSource(tempZippedFile).copyTo(out) @@ -320,6 +321,7 @@ long addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, subTaskId, destFile ); + return unzippedSizeBytes; } } throw new ISE("Can't find location to handle segment[%s]", segment);