diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java index 31405fe4e7c2..52bf08357976 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java @@ -126,6 +126,11 @@ public File getTaskWorkDir(String taskId) return new File(getTaskDir(taskId), "work"); } + public File getTaskTempDir(String taskId) + { + return new File(getTaskDir(taskId), "temp"); + } + public File getTaskLockFile(String taskId) { return new File(getTaskDir(taskId), "lock"); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java index c47425a4ee87..95acb22536ff 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java @@ -33,11 +33,13 @@ import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.loading.StorageLocation; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.utils.CompressionUtils; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.Period; @@ -45,6 +47,7 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; @@ -57,12 +60,14 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * This class manages intermediary segments for data shuffle between native parallel index tasks. - * In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers + * In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers (or indexer) * and phase 2 tasks read those files via HTTP. * * The directory where segment files are placed is structured as @@ -75,11 +80,12 @@ @ManageLifecycle public class IntermediaryDataManager { - private static final Logger log = new Logger(IntermediaryDataManager.class); + private static final Logger LOG = new Logger(IntermediaryDataManager.class); private final long intermediaryPartitionDiscoveryPeriodSec; private final long intermediaryPartitionCleanupPeriodSec; private final Period intermediaryPartitionTimeout; + private final TaskConfig taskConfig; private final List shuffleDataLocations; private final IndexingServiceClient indexingServiceClient; @@ -108,6 +114,7 @@ public IntermediaryDataManager( this.intermediaryPartitionDiscoveryPeriodSec = workerConfig.getIntermediaryPartitionDiscoveryPeriodSec(); this.intermediaryPartitionCleanupPeriodSec = workerConfig.getIntermediaryPartitionCleanupPeriodSec(); this.intermediaryPartitionTimeout = workerConfig.getIntermediaryPartitionTimeout(); + this.taskConfig = taskConfig; this.shuffleDataLocations = taskConfig .getShuffleDataLocations() .stream() @@ -119,6 +126,7 @@ public IntermediaryDataManager( @LifecycleStart public void start() { + discoverSupervisorTaskPartitions(); supervisorTaskChecker = Execs.scheduledSingleThreaded("intermediary-data-manager-%d"); // Discover partitions for new supervisorTasks supervisorTaskChecker.scheduleAtFixedRate( @@ -127,7 +135,7 @@ public void start() discoverSupervisorTaskPartitions(); } catch (Exception e) { - log.warn(e, "Error while discovering supervisorTasks"); + LOG.warn(e, "Error while discovering supervisorTasks"); } }, intermediaryPartitionDiscoveryPeriodSec, @@ -141,10 +149,10 @@ public void start() deleteExpiredSuprevisorTaskPartitionsIfNotRunning(); } catch (InterruptedException e) { - log.error(e, "Error while cleaning up partitions for expired supervisors"); + LOG.error(e, "Error while cleaning up partitions for expired supervisors"); } catch (Exception e) { - log.warn(e, "Error while cleaning up partitions for expired supervisors"); + LOG.warn(e, "Error while cleaning up partitions for expired supervisors"); } }, intermediaryPartitionCleanupPeriodSec, @@ -163,9 +171,13 @@ public void stop() throws InterruptedException supervisorTaskCheckTimes.clear(); } + /** + * IntermediaryDataManager periodically calls this method after it starts up to search for unknown intermediary data. + */ private void discoverSupervisorTaskPartitions() { for (StorageLocation location : shuffleDataLocations) { + final Path locationPath = location.getPath().toPath().toAbsolutePath(); final MutableInt numDiscovered = new MutableInt(0); final File[] dirsPerSupervisorTask = location.getPath().listFiles(); if (dirsPerSupervisorTask != null) { @@ -174,13 +186,32 @@ private void discoverSupervisorTaskPartitions() supervisorTaskCheckTimes.computeIfAbsent( supervisorTaskId, k -> { + for (File eachFile : FileUtils.listFiles(supervisorTaskDir, null, true)) { + final String relativeSegmentPath = locationPath + .relativize(eachFile.toPath().toAbsolutePath()) + .toString(); + // StorageLocation keeps track of how much storage capacity is being used. + // Newly found files should be known to the StorageLocation to keep it up to date. + final File reservedFile = location.reserve( + relativeSegmentPath, + eachFile.getName(), + eachFile.length() + ); + if (reservedFile == null) { + LOG.warn("Can't add a discovered partition[%s]", eachFile.getAbsolutePath()); + } + } numDiscovered.increment(); return DateTimes.nowUtc().plus(intermediaryPartitionTimeout); } ); } } - log.info("Discovered partitions for [%s] new supervisor tasks", numDiscovered.getValue()); + LOG.info( + "Discovered partitions for [%s] new supervisor tasks under location[%s]", + numDiscovered.getValue(), + location.getPath() + ); } } @@ -203,7 +234,7 @@ private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws Interrup } } - log.info("Found [%s] expired supervisor tasks", expiredSupervisorTasks.size()); + LOG.info("Found [%s] expired supervisor tasks", expiredSupervisorTasks.size()); final Map taskStatuses = indexingServiceClient.getTaskStatuses(expiredSupervisorTasks); for (Entry entry : taskStatuses.entrySet()) { @@ -215,7 +246,7 @@ private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws Interrup deletePartitions(supervisorTaskId); } catch (IOException e) { - log.warn(e, "Failed to delete partitions for task[%s]", supervisorTaskId); + LOG.warn(e, "Failed to delete partitions for task[%s]", supervisorTaskId); } } else { // If it's still running, update last access time. @@ -227,17 +258,74 @@ private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws Interrup /** * Write a segment into one of configured locations. The location to write is chosen in a round-robin manner per * supervisorTaskId. - * - * This method is only useful for the new Indexer model. Tasks running in the existing middleManager should the static - * addSegment method. */ - public void addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentFile) + long addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentDir) + throws IOException { + // Get or create the location iterator for supervisorTask. final Iterator iterator = locationIterators.computeIfAbsent( supervisorTaskId, - k -> Iterators.cycle(shuffleDataLocations) + k -> { + final Iterator cyclicIterator = Iterators.cycle(shuffleDataLocations); + // Random start of the iterator + final int random = ThreadLocalRandom.current().nextInt(shuffleDataLocations.size()); + IntStream.range(0, random).forEach(i -> cyclicIterator.next()); + return cyclicIterator; + } ); - addSegment(iterator, shuffleDataLocations.size(), supervisorTaskId, subTaskId, segment, segmentFile); + + // Create a zipped segment in a temp directory. + final File taskTempDir = taskConfig.getTaskTempDir(subTaskId); + + try (final Closer resourceCloser = Closer.create()) { + if (taskTempDir.mkdirs()) { + resourceCloser.register(() -> { + try { + FileUtils.forceDelete(taskTempDir); + } + catch (IOException e) { + LOG.warn(e, "Failed to delete directory[%s]", taskTempDir.getAbsolutePath()); + } + }); + } + + // Tempary compressed file. Will be removed when taskTempDir is deleted. + final File tempZippedFile = new File(taskTempDir, segment.getId().toString()); + final long unzippedSizeBytes = CompressionUtils.zip(segmentDir, tempZippedFile); + if (unzippedSizeBytes == 0) { + throw new IOE( + "Read 0 bytes from segmentDir[%s]", + segmentDir.getAbsolutePath() + ); + } + + // Try copying the zipped segment to one of storage locations + for (int i = 0; i < shuffleDataLocations.size(); i++) { + final StorageLocation location = iterator.next(); + final String partitionFilePath = getPartitionFilePath( + supervisorTaskId, + subTaskId, + segment.getInterval(), + segment.getShardSpec().getPartitionNum() + ); + final File destFile = location.reserve(partitionFilePath, segment.getId().toString(), tempZippedFile.length()); + if (destFile != null) { + FileUtils.forceMkdirParent(destFile); + org.apache.druid.java.util.common.FileUtils.writeAtomically( + destFile, + out -> Files.asByteSource(tempZippedFile).copyTo(out) + ); + LOG.info( + "Wrote intermediary segment for segment[%s] of subtask[%s] at [%s]", + segment.getId(), + subTaskId, + destFile + ); + return unzippedSizeBytes; + } + } + throw new ISE("Can't find location to handle segment[%s]", segment); + } } public List findPartitionFiles(String supervisorTaskId, Interval interval, int partitionId) @@ -259,7 +347,7 @@ public void deletePartitions(String supervisorTaskId) throws IOException for (StorageLocation location : shuffleDataLocations) { final File supervisorTaskPath = new File(location.getPath(), supervisorTaskId); if (supervisorTaskPath.exists()) { - log.info("Cleaning up [%s]", supervisorTaskPath); + LOG.info("Cleaning up [%s]", supervisorTaskPath); for (File eachFile : FileUtils.listFiles(supervisorTaskPath, null, true)) { location.removeFile(eachFile); } @@ -269,54 +357,6 @@ public void deletePartitions(String supervisorTaskId) throws IOException supervisorTaskCheckTimes.remove(supervisorTaskId); } - /** - * Iterate through the given storage locations to find one which can handle the given segment. - */ - public static void addSegment( - Iterator cyclicIterator, - int numLocations, - String supervisorTaskId, - String subTaskId, - DataSegment segment, - File segmentFile - ) - { - for (int i = 0; i < numLocations; i++) { - final StorageLocation location = cyclicIterator.next(); - final File destFile = location.reserve( - getPartitionFilePath( - supervisorTaskId, - subTaskId, - segment.getInterval(), - segment.getShardSpec().getPartitionNum() - ), - segment.getId(), - segmentFile.length() - ); - if (destFile != null) { - try { - FileUtils.forceMkdirParent(destFile); - final long copiedBytes = Files.asByteSource(segmentFile).copyTo(Files.asByteSink(destFile)); - if (copiedBytes == 0) { - throw new IOE( - "0 bytes copied after copying a segment file from [%s] to [%s]", - segmentFile.getAbsolutePath(), - destFile.getAbsolutePath() - ); - } else { - return; - } - } - catch (IOException e) { - // Only log here to try other locations as well. - log.warn(e, "Failed to write segmentFile at [%s]", destFile); - location.removeFile(segmentFile); - } - } - } - throw new ISE("Can't find location to handle segment[%s]", segment); - } - private static String getPartitionFilePath( String supervisorTaskId, String subTaskId, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java new file mode 100644 index 000000000000..fcbdf9d6c3fd --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.worker; + +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.timeline.DataSegment; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.Map; + +/** + * DataSegmentPusher used for storing intermediary data in local storage during data shuffle of native parallel + * indexing. + */ +public class ShuffleDataSegmentPusher implements DataSegmentPusher +{ + private final String supervisorTaskId; + private final String subTaskId; + private final IntermediaryDataManager intermediaryDataManager; + + public ShuffleDataSegmentPusher( + String supervisorTaskId, + String subTaskId, + IntermediaryDataManager intermediaryDataManager + ) + { + this.supervisorTaskId = supervisorTaskId; + this.subTaskId = subTaskId; + this.intermediaryDataManager = intermediaryDataManager; + } + + @Override + public String getPathForHadoop(String dataSource) + { + throw new UnsupportedOperationException(); + } + + @Override + public String getPathForHadoop() + { + throw new UnsupportedOperationException(); + } + + @Override + public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException + { + final long unzippedSize = intermediaryDataManager.addSegment(supervisorTaskId, subTaskId, segment, file); + return segment.withSize(unzippedSize) + .withBinaryVersion(SegmentUtils.getVersionFromDir(file)); + } + + @Override + public Map makeLoadSpec(URI finalIndexZipFilePath) + { + throw new UnsupportedOperationException(); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java index c158c0e33a50..6a5f611fdf5a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java @@ -19,7 +19,6 @@ package org.apache.druid.indexing.worker; -import com.amazonaws.util.StringUtils; import com.google.common.collect.ImmutableList; import org.apache.commons.io.FileUtils; import org.apache.druid.client.indexing.IndexingServiceClient; @@ -43,6 +42,7 @@ import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -116,19 +116,20 @@ public void testCleanup() throws IOException, InterruptedException { final String supervisorTaskId = "supervisorTaskId"; final Interval interval = Intervals.of("2018/2019"); - final File segmentFile = generateSegmentFile(); + final File segmentFile = generateSegmentDir("test"); final DataSegment segment = newSegment(interval, 0); intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId", segment, segmentFile); - Thread.sleep(8000); + Thread.sleep(3000); Assert.assertTrue(intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, 0).isEmpty()); } - private File generateSegmentFile() throws IOException + private File generateSegmentDir(String fileName) throws IOException { - final File segmentFile = tempDir.newFile(); - FileUtils.write(segmentFile, "test data.", StringUtils.UTF8); - return segmentFile; + // Each file size is 138 bytes after compression + final File segmentDir = tempDir.newFolder(); + FileUtils.write(new File(segmentDir, fileName), "test data.", StandardCharsets.UTF_8); + return segmentDir; } private DataSegment newSegment(Interval interval, int partitionId) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java index 7bd9e906a96b..7d322d080d30 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java @@ -19,7 +19,6 @@ package org.apache.druid.indexing.worker; -import com.amazonaws.util.StringUtils; import com.google.common.collect.ImmutableList; import org.apache.commons.io.FileUtils; import org.apache.druid.client.indexing.IndexingServiceClient; @@ -41,6 +40,7 @@ import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Comparator; import java.util.List; @@ -67,7 +67,7 @@ public void setup() throws IOException false, null, null, - ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), 150L, null)) + ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), 600L, null)) ); final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient(); intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient); @@ -83,15 +83,16 @@ public void teardown() throws InterruptedException @Test public void testAddSegmentFailure() throws IOException { - for (int i = 0; i < 15; i++) { - File segmentFile = generateSegmentFile(); + int i = 0; + for (; i < 4; i++) { + File segmentFile = generateSegmentDir("file_" + i); DataSegment segment = newSegment(Intervals.of("2018/2019"), i); intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile); } expectedException.expect(IllegalStateException.class); expectedException.expectMessage("Can't find location to handle segment"); - File segmentFile = generateSegmentFile(); - DataSegment segment = newSegment(Intervals.of("2018/2019"), 16); + File segmentFile = generateSegmentDir("file_" + i); + DataSegment segment = newSegment(Intervals.of("2018/2019"), 4); intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile); } @@ -101,15 +102,15 @@ public void testFindPartitionFiles() throws IOException final String supervisorTaskId = "supervisorTaskId"; final Interval interval = Intervals.of("2018/2019"); final int partitionId = 0; - for (int i = 0; i < 10; i++) { - final File segmentFile = generateSegmentFile(); + for (int i = 0; i < 4; i++) { + final File segmentFile = generateSegmentDir("file_" + i); final DataSegment segment = newSegment(interval, partitionId); intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId_" + i, segment, segmentFile); } final List files = intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, partitionId); - Assert.assertEquals(10, files.size()); + Assert.assertEquals(4, files.size()); files.sort(Comparator.comparing(File::getName)); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 4; i++) { Assert.assertEquals("subTaskId_" + i, files.get(i).getName()); } } @@ -119,9 +120,9 @@ public void deletePartitions() throws IOException { final String supervisorTaskId = "supervisorTaskId"; final Interval interval = Intervals.of("2018/2019"); - for (int partitionId = 0; partitionId < 5; partitionId++) { - for (int subTaskId = 0; subTaskId < 3; subTaskId++) { - final File segmentFile = generateSegmentFile(); + for (int partitionId = 0; partitionId < 2; partitionId++) { + for (int subTaskId = 0; subTaskId < 2; subTaskId++) { + final File segmentFile = generateSegmentDir("file_" + partitionId + "_" + subTaskId); final DataSegment segment = newSegment(interval, partitionId); intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId_" + subTaskId, segment, segmentFile); } @@ -129,7 +130,7 @@ public void deletePartitions() throws IOException intermediaryDataManager.deletePartitions(supervisorTaskId); - for (int partitionId = 0; partitionId < 5; partitionId++) { + for (int partitionId = 0; partitionId < 2; partitionId++) { Assert.assertTrue(intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, partitionId).isEmpty()); } } @@ -139,22 +140,24 @@ public void testAddRemoveAdd() throws IOException { final String supervisorTaskId = "supervisorTaskId"; final Interval interval = Intervals.of("2018/2019"); - for (int i = 0; i < 15; i++) { - File segmentFile = generateSegmentFile(); + int i = 0; + for (; i < 4; i++) { + File segmentFile = generateSegmentDir("file_" + i); DataSegment segment = newSegment(interval, i); intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile); } intermediaryDataManager.deletePartitions(supervisorTaskId); - File segmentFile = generateSegmentFile(); - DataSegment segment = newSegment(interval, 16); + File segmentFile = generateSegmentDir("file_" + i); + DataSegment segment = newSegment(interval, i); intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId", segment, segmentFile); } - private File generateSegmentFile() throws IOException + private File generateSegmentDir(String fileName) throws IOException { - final File segmentFile = tempDir.newFile(); - FileUtils.write(segmentFile, "test data.", StringUtils.UTF8); - return segmentFile; + // Each file size is 138 bytes after compression + final File segmentDir = tempDir.newFolder(); + FileUtils.write(new File(segmentDir, fileName), "test data.", StandardCharsets.UTF_8); + return segmentDir; } private DataSegment newSegment(Interval interval, int partitionId) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java new file mode 100644 index 000000000000..c3dfcdd66e60 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.worker; + +import com.google.common.collect.ImmutableList; +import com.google.common.io.Files; +import com.google.common.primitives.Ints; +import org.apache.commons.io.FileUtils; +import org.apache.druid.client.indexing.IndexingServiceClient; +import org.apache.druid.client.indexing.NoopIndexingServiceClient; +import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.indexing.worker.config.WorkerConfig; +import org.apache.druid.java.util.common.FileUtils.FileCopyResult; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.segment.loading.StorageLocationConfig; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.apache.druid.utils.CompressionUtils; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +public class ShuffleDataSegmentPusherTest +{ + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private IntermediaryDataManager intermediaryDataManager; + private ShuffleDataSegmentPusher segmentPusher; + + @Before + public void setup() throws IOException + { + final WorkerConfig workerConfig = new WorkerConfig(); + final TaskConfig taskConfig = new TaskConfig( + null, + null, + null, + null, + null, + false, + null, + null, + ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null)) + ); + final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient(); + intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient); + intermediaryDataManager.start(); + segmentPusher = new ShuffleDataSegmentPusher("supervisorTaskId", "subTaskId", intermediaryDataManager); + } + + @After + public void teardown() throws InterruptedException + { + intermediaryDataManager.stop(); + } + + @Test + public void testPush() throws IOException + { + final File segmentDir = generateSegmentDir(); + final DataSegment segment = newSegment(Intervals.of("2018/2019")); + final DataSegment pushed = segmentPusher.push(segmentDir, segment, true); + + Assert.assertEquals(9, pushed.getBinaryVersion().intValue()); + Assert.assertEquals(14, pushed.getSize()); // 10 bytes data + 4 bytes version + + final List files = intermediaryDataManager.findPartitionFiles( + "supervisorTaskId", + segment.getInterval(), + segment.getShardSpec().getPartitionNum() + ); + Assert.assertEquals(1, files.size()); + final File zippedSegment = files.get(0); + final File tempDir = temporaryFolder.newFolder(); + final FileCopyResult result = CompressionUtils.unzip(zippedSegment, tempDir); + final List unzippedFiles = new ArrayList<>(result.getFiles()); + unzippedFiles.sort(Comparator.comparing(File::getName)); + final File dataFile = unzippedFiles.get(0); + Assert.assertEquals("test", dataFile.getName()); + Assert.assertEquals("test data.", Files.readFirstLine(dataFile, StandardCharsets.UTF_8)); + final File versionFile = unzippedFiles.get(1); + Assert.assertEquals("version.bin", versionFile.getName()); + Assert.assertArrayEquals(Ints.toByteArray(0x9), Files.toByteArray(versionFile)); + } + + private File generateSegmentDir() throws IOException + { + // Each file size is 138 bytes after compression + final File segmentDir = temporaryFolder.newFolder(); + Files.asByteSink(new File(segmentDir, "version.bin")).write(Ints.toByteArray(0x9)); + FileUtils.write(new File(segmentDir, "test"), "test data.", StandardCharsets.UTF_8); + return segmentDir; + } + + private DataSegment newSegment(Interval interval) + { + return new DataSegment( + "dataSource", + interval, + "version", + null, + null, + null, + new NumberedShardSpec(0, 0), + 9, + 0 + ); + } +} diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java index 842295ebb9fb..6ed86a3c2146 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java @@ -23,7 +23,6 @@ import org.apache.commons.io.FileUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.SegmentId; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -115,7 +114,7 @@ public synchronized void removeSegmentDir(File segmentDir, DataSegment segment) @Nullable public synchronized File reserve(String segmentDir, DataSegment segment) { - return reserve(segmentDir, segment.getId(), segment.getSize()); + return reserve(segmentDir, segment.getId().toString(), segment.getSize()); } /** @@ -124,7 +123,7 @@ public synchronized File reserve(String segmentDir, DataSegment segment) * Returns null otherwise. */ @Nullable - public synchronized File reserve(String segmentFilePathToAdd, SegmentId segmentId, long segmentSize) + public synchronized File reserve(String segmentFilePathToAdd, String segmentId, long segmentSize) { final File segmentFileToAdd = new File(path, segmentFilePathToAdd); if (files.contains(segmentFileToAdd)) { @@ -145,7 +144,7 @@ public synchronized File reserve(String segmentFilePathToAdd, SegmentId segmentI */ @VisibleForTesting @GuardedBy("this") - boolean canHandle(SegmentId segmentId, long segmentSize) + boolean canHandle(String segmentId, long segmentSize) { if (availableSizeBytes() < segmentSize) { log.warn( diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java index cb0821717668..f617261068ce 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java @@ -153,8 +153,6 @@ public void testGetSegment() throws IOException, ExecutionException, Interrupted for (Future future : futures) { future.get(); } - - System.out.println(manager.getLocations().get(0).availableSizeBytes()); } private DataSegment newSegment(Interval interval, int partitionId) diff --git a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java index 23da42298783..7be5328de5be 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java @@ -39,18 +39,18 @@ public void testStorageLocationFreePercent() { // free space ignored only maxSize matters StorageLocation locationPlain = fakeLocation(100_000, 5_000, 10_000, null); - Assert.assertTrue(locationPlain.canHandle(newSegmentId("2012/2013"), 9_000)); - Assert.assertFalse(locationPlain.canHandle(newSegmentId("2012/2013"), 11_000)); + Assert.assertTrue(locationPlain.canHandle(newSegmentId("2012/2013").toString(), 9_000)); + Assert.assertFalse(locationPlain.canHandle(newSegmentId("2012/2013").toString(), 11_000)); // enough space available maxSize is the limit StorageLocation locationFree = fakeLocation(100_000, 25_000, 10_000, 10.0); - Assert.assertTrue(locationFree.canHandle(newSegmentId("2012/2013"), 9_000)); - Assert.assertFalse(locationFree.canHandle(newSegmentId("2012/2013"), 11_000)); + Assert.assertTrue(locationFree.canHandle(newSegmentId("2012/2013").toString(), 9_000)); + Assert.assertFalse(locationFree.canHandle(newSegmentId("2012/2013").toString(), 11_000)); // disk almost full percentage is the limit StorageLocation locationFull = fakeLocation(100_000, 15_000, 10_000, 10.0); - Assert.assertTrue(locationFull.canHandle(newSegmentId("2012/2013"), 4_000)); - Assert.assertFalse(locationFull.canHandle(newSegmentId("2012/2013"), 6_000)); + Assert.assertTrue(locationFull.canHandle(newSegmentId("2012/2013").toString(), 4_000)); + Assert.assertFalse(locationFull.canHandle(newSegmentId("2012/2013").toString(), 6_000)); } private StorageLocation fakeLocation(long total, long free, long max, Double percent) @@ -99,7 +99,7 @@ private void verifyLoc(long maxSize, StorageLocation loc) { Assert.assertEquals(maxSize, loc.availableSizeBytes()); for (int i = 0; i <= maxSize; ++i) { - Assert.assertTrue(String.valueOf(i), loc.canHandle(newSegmentId("2013/2014"), i)); + Assert.assertTrue(String.valueOf(i), loc.canHandle(newSegmentId("2013/2014").toString(), i)); } }