Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ jobs:
${MAVEN_SKIP} -Dremoteresources.skip=true -Ddruid.generic.useDefaultValueForNull=${DRUID_USE_DEFAULT_VALUE_FOR_NULL}
- sh -c "dmesg | egrep -i '(oom|out of memory|kill process|killed).*' -C 1 || exit 0"
- free -m
- ${MVN} -pl ${MAVEN_PROJECTS} jacoco:report
- travis_wait 15 ${MVN} -pl ${MAVEN_PROJECTS} jacoco:report
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change would be orthogonal to the PR no?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is needed since SuperSorterTest runs for > 10 minutes sometimes and doesn't produce any log in a successful case. So, the wait for test to finish is increased to 15 minutes.

# Add merge target branch to determine diff (see https://github.com/travis-ci/travis-ci/issues/6069).
# This is not needed for build triggered by tags, since there will be no code diff.
- echo "TRAVIS_BRANCH=${TRAVIS_BRANCH}" # for debugging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public interface StorageConnector
boolean pathExists(String path) throws IOException;

/**
* Reads the data present at the path the underlying storage system. Most implementations prepend the input path
* Reads the data present at the path in the underlying storage system. Most implementations prepend the input path
* with a basePath.
* The caller should take care of closing the stream when done or in case of error.
*
Expand All @@ -75,6 +75,19 @@ public interface StorageConnector
*/
InputStream read(String path) throws IOException;

/**
* Reads the data present for a given range at the path in the underlying storage system.
* Most implementations prepend the input path with a basePath.
* The caller should take care of closing the stream when done or in case of error. Further, the caller must ensure
* that the start offset and the size of the read are valid parameters for the given path for correct behavior.
* @param path The path to read data from
* @param from Start offset of the read in the path
* @param size Length of the read to be done
* @return InputStream starting from the given offset limited by the given size
* @throws IOException if the path is not present or the unable to read the data present on the path
*/
InputStream readRange(String path, long from, long size) throws IOException;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the behaviour of the implementations be defined here under following conditions:

  1. from > size of underlying storage (undefined, error, or we return empty output)
  2. from + size > size of underlying storage (undefined, error, or we chomp the stream)
  3. from < 0 (undefined, error, or from indicates index from the end)
  4. size < 0 (undefined, error, or we read in reverse)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have added some validation checks for inputs in local and s3 connector. haven't done the size based check in s3 connector yet since it'll require a status api call for the object first - I'm not sure if it is necessary to do that check as of now.


/**
* Open an {@link OutputStream} for writing data to the path in the underlying storage system.
* Most implementations prepend the input path with a basePath.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,22 @@

package org.apache.druid.storage.local;

import org.apache.commons.io.input.BoundedInputStream;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.StorageConnector;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -55,16 +60,32 @@ public boolean pathExists(String path)
return fileWithBasePath(path).exists();
}

/**
* Reads the file present as basePath + path. Will throw an IO exception in case the file is not present.
* Closing of the stream is the responsibility of the caller.
*/
@Override
public InputStream read(String path) throws IOException
{
return Files.newInputStream(fileWithBasePath(path).toPath());
}

@Override
public InputStream readRange(String path, long from, long size) throws IOException
Copy link
Copy Markdown
Contributor

@cryptoe cryptoe Jan 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: from-> start seems better. Feel free to ignore this.

{
if (!pathExists(path)) {
throw new FileNotFoundException("Unable to find file " + fileWithBasePath(path).toPath() + " for reading");
}
long length = fileWithBasePath(path).length();
if (from < 0 || size < 0 || (from + size) > length) {
throw new IAE(
"Invalid arguments for reading %s. from = %d, readSize = %d, fileSize = %d",
fileWithBasePath(path).toPath(),
from,
size,
length
);
}
FileChannel fileChannel = FileChannel.open(fileWithBasePath(path).toPath(), StandardOpenOption.READ);
return new BoundedInputStream(Channels.newInputStream(fileChannel.position(from)), size);
}

/**
* Writes the file present with the materialized location as basePath + path.
* In case the parent directory does not exist, we create the parent dir recursively.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -151,6 +152,33 @@ public void listFilesTest() throws Exception
);
}

@Test
public void testReadRange() throws Exception
Comment thread
rohangarg marked this conversation as resolved.
{
String uuid = UUID.randomUUID().toString();
String data = "Hello";
try (OutputStream outputStream = storageConnector.write(uuid)) {
outputStream.write(data.getBytes(StandardCharsets.UTF_8));
}

// non empty reads
for (int start = 0; start < data.length(); start++) {
for (int length = 1; length <= data.length() - start; length++) {
InputStream is = storageConnector.readRange(uuid, start, length);
byte[] dataBytes = new byte[length];
Assert.assertEquals(is.read(dataBytes), length);
Assert.assertEquals(is.read(), -1); // reading further produces no data
Assert.assertEquals(data.substring(start, start + length), new String(dataBytes, StandardCharsets.UTF_8));
}
}

// empty read
InputStream is = storageConnector.readRange(uuid, 0, 0);
byte[] dataBytes = new byte[0];
Assert.assertEquals(is.read(dataBytes), -1);
Assert.assertEquals(data.substring(0, 0), new String(dataBytes, StandardCharsets.UTF_8));
}

private void checkContents(String uuid) throws IOException
{
try (InputStream inputStream = storageConnector.read(uuid)) {
Expand Down
2 changes: 2 additions & 0 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ The following table lists the context parameters for the MSQ task engine:
| `clusterStatisticsMergeMode` | Whether to use parallel or sequential mode for merging of the worker sketches. Can be `PARALLEL`, `SEQUENTIAL` or `AUTO`. See [Sketch Merging Mode](#sketch-merging-mode) for more information. | `PARALLEL` |
| `durableShuffleStorage` | SELECT, INSERT, REPLACE <br /><br />Whether to use durable storage for shuffle mesh. To use this feature, configure the durable storage at the server level using `druid.msq.intermediate.storage.enable=true`). If these properties are not configured, any query with the context variable `durableShuffleStorage=true` fails with a configuration error. <br /><br /> | `false` |
| `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on fault tolerance mode or not. Failed workers are retried based on [Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly set to false. | `false` |
| `composedIntermediateSuperSorterStorageEnabled` | SELECT, INSERT, REPLACE<br /><br /> Whether to enable automatic fallback to durable storage from local storage for sorting's intermediate data. Requires to setup `intermediateSuperSorterStorageMaxLocalBytes` limit for local storage and durable shuffle storage feature as well.| `false` |
| `intermediateSuperSorterStorageMaxLocalBytes` | SELECT, INSERT, REPLACE<br /><br /> Whether to enable a byte limit on local storage for sorting's intermediate data. If that limit is crossed, the task fails with `ResourceLimitExceededException`.| `9223372036854775807` |

## Sketch Merging Mode
This section details the advantages and performance of various Cluster By Statistics Merge Modes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.druid.frame.key.SortColumn;
import org.apache.druid.frame.processor.FrameProcessorExecutor;
import org.apache.druid.frame.processor.FrameProcessors;
import org.apache.druid.frame.util.DurableStorageUtils;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.LockGranularity;
Expand Down Expand Up @@ -153,7 +154,6 @@
import org.apache.druid.msq.querykit.groupby.GroupByQueryKit;
import org.apache.druid.msq.querykit.scan.ScanQueryKit;
import org.apache.druid.msq.shuffle.DurableStorageInputChannelFactory;
import org.apache.druid.msq.shuffle.DurableStorageUtils;
import org.apache.druid.msq.shuffle.WorkerInputChannelFactory;
import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation;
import org.apache.druid.msq.util.DimensionSchemaUtils;
Expand Down Expand Up @@ -293,6 +293,9 @@ public ControllerImpl(
{
this.task = task;
this.context = context;
this.isDurableStorageEnabled = MultiStageQueryContext.isDurableStorageEnabled(
task.getQuerySpec().getQuery().context()
);
}

@Override
Expand Down Expand Up @@ -587,6 +590,25 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer)
.orElse(MSQWarnings.DEFAULT_MAX_PARSE_EXCEPTIONS_ALLOWED);
}

ImmutableMap.Builder<String, Object> taskContextOverridesBuilder = ImmutableMap.builder();
taskContextOverridesBuilder
.put(
MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE,
isDurableStorageEnabled
).put(
MultiStageQueryContext.CTX_COMPOSED_INTERMEDIATE_SUPER_SORTER_STORAGE,
MultiStageQueryContext.isComposedIntermediateSuperSorterStorageEnabled(
task.getQuerySpec().getQuery().context()
)
).put(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the CTX_COMPOSED_INTERMEDIATE_SUPER_SORTER_STORAGE only then CTX_INTERMEDIATE_SUPER_SORTER_STORAGE_MAX_LOCAL_BYTES should be set no ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they can work independently - the local bytes limit just enforces a resource limit on the local storage and the composition storage enables joining local disk and durable storage together.
Currently, the CTX_INTERMEDIATE_SUPER_SORTER_STORAGE_MAX_LOCAL_BYTES is defaulted as Long.MAX_VALUE and is used in the normal flow as well to keep consistency. WDYT?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should throw an error if one sets CTX_COMPOSED_INTERMEDIATE_SUPER_SORTER_STORAGE and CTX_INTERMEDIATE_SUPER_SORTER_STORAGE_MAX_LOCAL_BYTES is not set.

What will happen is the users will set this flag thinking stuff is going to work but they would be proved wrong.
As @paul-rogers, throwing errors is a way for people to force reading documentation ^-^

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree with the documentation part but setting CTX_INTERMEDIATE_SUPER_SORTER_STORAGE_MAX_LOCAL_BYTES is also not enough for composition. The user would also have to set the durable storage setting and all the configurations along with it. And in future, if any memory storage is allowed in composition, then that is needed too.
So, I thought the documentation for the composition parameter would clearly state what all is needed to configure it properly.

MultiStageQueryContext.CTX_INTERMEDIATE_SUPER_SORTER_STORAGE_MAX_LOCAL_BYTES,
MultiStageQueryContext.getIntermediateSuperSorterStorageMaxLocalBytes(
task.getQuerySpec().getQuery().context()
)
).put(
MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED,
maxParseExceptions
);
this.workerTaskLauncher = new MSQWorkerTaskLauncher(
id(),
task.getDataSource(),
Expand All @@ -600,8 +622,7 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer)
}
});
},
isDurableStorageEnabled,
maxParseExceptions,
taskContextOverridesBuilder.build(),
// 10 minutes +- 2 minutes jitter
TimeUnit.SECONDS.toMillis(600 + ThreadLocalRandom.current().nextInt(-4, 5) * 30L)
);
Expand Down
Loading