diff --git a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageConfiguration.java b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageConfiguration.java
index efa086d7db80..1a48020905e6 100644
--- a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageConfiguration.java
+++ b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageConfiguration.java
@@ -31,18 +31,18 @@ public abstract class CloudStorageConfiguration {
public static final CloudStorageConfiguration DEFAULT = builder().build();
/**
- * Returns path of current working directory. This defaults to the root directory.
+ * @return path of current working directory. This defaults to the root directory.
*/
public abstract String workingDirectory();
/**
- * Returns {@code true} if we shouldn't throw an exception when encountering object names
+ * @return {@code true} if we shouldn't throw an exception when encountering object names
* containing superfluous slashes, e.g. {@code a//b}.
*/
public abstract boolean permitEmptyPathComponents();
/**
- * Returns {@code true} if '/' prefix on absolute object names should be removed before I/O.
+ * @return {@code true} if '/' prefix on absolute object names should be removed before I/O.
*
*
If you disable this feature, please take into consideration that all paths created from a
* URI will have the leading slash.
@@ -50,12 +50,12 @@ public abstract class CloudStorageConfiguration {
public abstract boolean stripPrefixSlash();
/**
- * Returns {@code true} if paths with a trailing slash should be treated as fake directories.
+ * @return {@code true} if paths with a trailing slash should be treated as fake directories.
*/
public abstract boolean usePseudoDirectories();
/**
- * Returns block size (in bytes) used when talking to the Google Cloud Storage HTTP server.
+ * @return block size (in bytes) used when talking to the Google Cloud Storage HTTP server.
*/
public abstract int blockSize();
@@ -67,6 +67,8 @@ public abstract class CloudStorageConfiguration {
*
Pseudo-directories are enabled, so any path with a trailing slash is a fake directory.
*
+ *
+ * @return builder
*/
public static Builder builder() {
return new Builder();
@@ -89,6 +91,8 @@ public static final class Builder {
* {@link CloudStorageFileSystem} object.
*
* @throws IllegalArgumentException if {@code path} is not absolute.
+ * @param path the new current working directory
+ * @return builder
*/
public Builder workingDirectory(String path) {
checkArgument(UnixPath.getPath(false, path).isAbsolute(), "not absolute: %s", path);
@@ -99,6 +103,9 @@ public Builder workingDirectory(String path) {
/**
* Configures whether or not we should throw an exception when encountering object names
* containing superfluous slashes, e.g. {@code a//b}.
+ *
+ * @param value whether to permit empty path components (will throw if false)
+ * @return builder
*/
public Builder permitEmptyPathComponents(boolean value) {
permitEmptyPathComponents = value;
@@ -110,6 +117,9 @@ public Builder permitEmptyPathComponents(boolean value) {
*
* If you disable this feature, please take into consideration that all paths created from a
* URI will have the leading slash.
+ *
+ * @param value if true, remove the '/' prefix on absolute object names
+ * @return builder
*/
public Builder stripPrefixSlash(boolean value) {
stripPrefixSlash = value;
@@ -118,6 +128,9 @@ public Builder stripPrefixSlash(boolean value) {
/**
* Configures if paths with a trailing slash should be treated as fake directories.
+ *
+ * @param value whether paths with a trailing slash should be treated as fake directories.
+ * @return builder
*/
public Builder usePseudoDirectories(boolean value) {
usePseudoDirectories = value;
@@ -128,6 +141,9 @@ public Builder usePseudoDirectories(boolean value) {
* Sets the block size in bytes that should be used for each HTTP request to the API.
*
*
The default is {@value CloudStorageFileSystem#BLOCK_SIZE_DEFAULT}.
+ *
+ * @param value block size in bytes that should be used for each HTTP request to the API.
+ * @return builder
*/
public Builder blockSize(int value) {
blockSize = value;
@@ -136,6 +152,8 @@ public Builder blockSize(int value) {
/**
* Creates new instance without destroying builder.
+ *
+ * @return CloudStorageConfiguration with the parameters you asked for.
*/
public CloudStorageConfiguration build() {
return new AutoValue_CloudStorageConfiguration(
diff --git a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileAttributes.java b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileAttributes.java
index ff77f0b96ab6..369f9ece864b 100644
--- a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileAttributes.java
+++ b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileAttributes.java
@@ -29,49 +29,49 @@
public interface CloudStorageFileAttributes extends BasicFileAttributes {
/**
- * Returns HTTP etag hash of object contents.
+ * @return HTTP etag hash of object contents.
*
* @see "https://developers.google.com/storage/docs/hashes-etags"
*/
Optional etag();
/**
- * Returns mime type (e.g. text/plain), if set.
+ * @return mime type (e.g. text/plain), if set.
*
* @see "http://en.wikipedia.org/wiki/Internet_media_type#List_of_common_media_types"
*/
Optional mimeType();
/**
- * Returns access control list.
+ * @return access control list.
*
* @see "https://developers.google.com/storage/docs/reference-headers#acl"
*/
Optional> acl();
/**
- * Returns {@code Cache-Control} HTTP header value, if set.
+ * @return {@code Cache-Control} HTTP header value, if set.
*
* @see "https://developers.google.com/storage/docs/reference-headers#cachecontrol"
*/
Optional cacheControl();
/**
- * Returns {@code Content-Encoding} HTTP header value, if set.
+ * @return {@code Content-Encoding} HTTP header value, if set.
*
* @see "https://developers.google.com/storage/docs/reference-headers#contentencoding"
*/
Optional contentEncoding();
/**
- * Returns {@code Content-Disposition} HTTP header value, if set.
+ * @return {@code Content-Disposition} HTTP header value, if set.
*
* @see "https://developers.google.com/storage/docs/reference-headers#contentdisposition"
*/
Optional contentDisposition();
/**
- * Returns user-specified metadata.
+ * @return user-specified metadata.
*
* @see "https://developers.google.com/storage/docs/reference-headers#contentdisposition"
*/
diff --git a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileSystem.java b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileSystem.java
index 60a39fb5a817..f2d83849e025 100644
--- a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileSystem.java
+++ b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileSystem.java
@@ -84,6 +84,8 @@ public static CloudStorageFileSystem forBucket(String bucket) {
/**
* Creates new file system instance for {@code bucket}, with customizable settings.
*
+ * @param bucket the bucket to access
+ * @param config configuration
* @see #forBucket(String)
*/
@CheckReturnValue
@@ -107,6 +109,10 @@ public static CloudStorageFileSystem forBucket(String bucket, CloudStorageConfig
* from using that if possible, for the reasons documented in
* {@link CloudStorageFileSystemProvider#newFileSystem(URI, java.util.Map)}
*
+ * @param bucket the bucket to access
+ * @param config configuration
+ * @param storageOptions storage options
+ *
* @see java.nio.file.FileSystems#getFileSystem(URI)
*/
@CheckReturnValue
@@ -132,14 +138,14 @@ public CloudStorageFileSystemProvider provider() {
}
/**
- * Returns Cloud Storage bucket name being served by this file system.
+ * @return Cloud Storage bucket name being served by this file system.
*/
public String bucket() {
return bucket;
}
/**
- * Returns configuration object for this file system instance.
+ * @return configuration object for this file system instance.
*/
public CloudStorageConfiguration config() {
return config;
@@ -147,6 +153,9 @@ public CloudStorageConfiguration config() {
/**
* Converts Cloud Storage object name to a {@link Path} object.
+ *
+ * @param first cloud storage object name
+ * @return Path object
*/
@Override
public CloudStoragePath getPath(String first, String... more) {
@@ -168,7 +177,7 @@ public void close() throws IOException {
}
/**
- * Returns {@code true}, even if you previously called the {@link #close()} method.
+ * @return {@code true}, even if you previously called the {@link #close()} method.
*/
@Override
public boolean isOpen() {
@@ -176,7 +185,7 @@ public boolean isOpen() {
}
/**
- * Returns {@code false}.
+ * @return {@code false}.
*/
@Override
public boolean isReadOnly() {
@@ -184,7 +193,7 @@ public boolean isReadOnly() {
}
/**
- * Returns {@value UnixPath#SEPARATOR}.
+ * @return {@value UnixPath#SEPARATOR}.
*/
@Override
public String getSeparator() {
diff --git a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileSystemProvider.java b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileSystemProvider.java
index 727327d215fd..33b61a89adbb 100644
--- a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileSystemProvider.java
+++ b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileSystemProvider.java
@@ -121,6 +121,8 @@ protected Path computeNext() {
/**
* Sets options that are only used by the constructor.
+ *
+ * @param newStorageOptions options that are only used by the constructor
*/
@VisibleForTesting
public static void setStorageOptions(StorageOptions newStorageOptions) {
@@ -170,7 +172,7 @@ public CloudStorageFileSystem getFileSystem(URI uri) {
}
/**
- * Returns Cloud Storage file system, provided a URI with no path, e.g. {@code gs://bucket}.
+ * @return Cloud Storage file system, provided a URI with no path, e.g. {@code gs://bucket}.
*
* @param uri bucket and current working directory, e.g. {@code gs://bucket}
* @param env map of configuration options, whose keys correspond to the method names of
@@ -226,8 +228,13 @@ public SeekableByteChannel newByteChannel(
private SeekableByteChannel newReadChannel(Path path, Set extends OpenOption> options)
throws IOException {
initStorage();
+ // null indicates we shouldn't add a prefetcher
+ SeekableByteChannelPrefetcherOptions prefetcherOptions = null;
+
for (OpenOption option : options) {
- if (option instanceof StandardOpenOption) {
+ if (option instanceof SeekableByteChannelPrefetcherOptions) {
+ prefetcherOptions = (SeekableByteChannelPrefetcherOptions)option;
+ } else if (option instanceof StandardOpenOption) {
switch ((StandardOpenOption) option) {
case READ:
// Default behavior.
@@ -255,6 +262,15 @@ private SeekableByteChannel newReadChannel(Path path, Set extends OpenOption>
if (cloudPath.seemsLikeADirectoryAndUsePseudoDirectories()) {
throw new CloudStoragePseudoDirectoryException(cloudPath);
}
+ if (prefetcherOptions!=null) {
+ // layer a prefetcher on top, using the provided options
+ List chans = new ArrayList<>();
+ for (int i=0; iThe default is {@value CloudStorageFileSystem#BLOCK_SIZE_DEFAULT}.
+ *
+ * @param size block size (in bytes)
+ * @return the corresponding option
*/
public static CloudStorageOption.OpenCopy withBlockSize(int size) {
return OptionBlockSize.create(size);
diff --git a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcher.java b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcher.java
new file mode 100644
index 000000000000..6f77dd1d8307
--- /dev/null
+++ b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcher.java
@@ -0,0 +1,476 @@
+/*
+ * Copyright 2016 Google Inc. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage.contrib.nio;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.NonWritableChannelException;
+import java.nio.channels.SeekableByteChannel;
+import java.util.ArrayList;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * SeekableByteChannelPrefetcher wraps an existing SeekableByteChannel to add prefetching.
+ * The prefetching is done on different threads, so you can write simple code that repeatedly
+ * calls read() to get data, processes it, and then calls read again -- and yet this
+ * simple code overlaps computation and communication for you.
+ * (Of course this is only worthwhile if the underlying SeekableByteChannel doesn't already
+ * implement prefetching).
+ *
+ * The prefetcher can read with multiple threads in parallel, and it can keep a buffered
+ * copy of already-read bytes just in case the caller doesn't follow a strictly linear pattern.
+ * It can also optionally use an extra thread in case of out-of-order reads to fetch the data at
+ * once even if all prefetching threads are busy.
+ *
+ */
+public class SeekableByteChannelPrefetcher implements SeekableByteChannel {
+
+ // normal-case number of parallel reads.
+ private final int prefetchingThreads;
+ // in case the data we need isn't being prefetched, we can use up to this many
+ // extra threads to fetch user-requested data.
+ private final int extraThreads;
+ // size in bytes for our buffer. Every fetcher grabs one buffer at a time.
+ private final int bufferSize;
+ // how many buffers we keep around. Should be at least prefetchingThreads + extraThreads.
+ // bufferSize * bufferCount is how much memory this class'll allocate.
+ private final int bufferCount;
+ // size of the underlying channel(s).
+ private final long size;
+ // where we pretend to be, wrt returning bytes from read()
+ private long position;
+ // whether we're open.
+ private boolean open = true;
+ private boolean closing;
+
+ private final ExecutorService exec;
+ private final Sorted buffers;
+ private final ArrayList idleWorkers;
+ private final int workerCount;
+
+ private class Buffer {
+ // index*bufferSize = file position. Set to -1 when we haven't yet decided.
+ public long index;
+ public Future promise;
+
+ public Buffer(int size) {
+ ByteBuffer bb = ByteBuffer.allocate(size);
+ this.index = -1;
+ this.promise = Futures.immediateFuture(bb);
+ }
+ }
+
+ // Holds all of the buffers. They are either being filled, or already full.
+ // A thin wrapper around TreeMap, so we can change away from the
+ // red-black tree if it's too heavy for this scale.
+ private class Sorted {
+ TreeMap data;
+ Queue byAge;
+
+ public Sorted(int bufferCount) {
+ data = new TreeMap<>();
+ byAge = new ArrayBlockingQueue(bufferCount+1);
+ }
+
+ public void put(long index, Buffer item) {
+ item.index = index;
+ data.put(index, item);
+ byAge.add(item);
+ }
+ public Buffer get(long index) {
+ return data.get(index);
+ }
+ public Buffer getOldest() {
+ Buffer oldest = byAge.remove();
+ data.remove(oldest.index);
+ oldest.index = -1;
+ return oldest;
+ }
+
+ public int size() {
+ return byAge.size();
+ }
+ }
+
+ private class Worker implements Callable, Closeable {
+ ByteBuffer bb;
+ long pos;
+ SeekableByteChannel chan;
+
+ public Worker(SeekableByteChannel chan) {
+ this.chan = chan;
+ }
+
+ public void init(long pos, Buffer buf) throws ExecutionException, InterruptedException {
+ if (!buf.promise.isDone()) {
+ throw new RuntimeException("Cannot download onto buffer that's still being filled");
+ }
+ ByteBuffer bb = buf.promise.get();
+ bb.clear();
+ buf.index = pos / bufferSize;
+ this.bb = bb;
+ this.pos = pos;
+ }
+
+ public ByteBuffer call() throws IOException, ExecutionException, InterruptedException {
+ if (pos > chan.size()) {
+ reassignWorker(this);
+ return null;
+ }
+ chan.position(pos);
+ ByteBuffer b = this.bb;
+ while (chan.read(b) > 0 && !closing) {
+ // read until buffer is full, or EOF
+ }
+ reassignWorker(this);
+ return b;
+ }
+
+ public void close() throws IOException {
+ chan.close();
+ }
+ }
+
+ public SeekableByteChannelPrefetcher(SeekableByteChannelPrefetcherOptions opts, Iterable channels) throws IOException {
+ this.bufferCount = opts.bufferCount;
+ this.bufferSize = opts.bufferSize;
+ buffers = new Sorted(bufferCount);
+ this.prefetchingThreads = opts.prefetchingThreads;
+ this.extraThreads = opts.extraThreads;
+ this.idleWorkers = new ArrayList<>(this.prefetchingThreads + this.extraThreads);
+ this.exec = Executors.newFixedThreadPool(prefetchingThreads + extraThreads);
+ SeekableByteChannel chan = null;
+ ImmutableList underlyingChannels = ImmutableList.copyOf(channels);
+ for (SeekableByteChannel bc : underlyingChannels) {
+ chan = bc;
+ idleWorkers.add(new Worker(bc));
+ }
+ workerCount = underlyingChannels.size();
+ size = chan.size();
+ position = 0;
+ }
+
+ /**
+ * Reads a sequence of bytes from this channel into the given buffer.
+ *
+ * @param dst destination buffer
+ */
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ if (!open) {
+ throw new ClosedChannelException();
+ }
+ ByteBuffer src;
+ try {
+ src = fetch(position);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return 0;
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ int bytesToCopy = dst.remaining();
+ byte[] array = src.array();
+ // src.position is how far we've written into the array.
+ // This should always be all the way to the end, unless we've hit EOF.
+ long blockIndex = position / bufferSize;
+ // |----blocksize----|----blocksize----|
+ // <-->|
+ // | ^ position
+ // ^ offset
+ int offset = (int)(position - (blockIndex * bufferSize));
+ int availableToCopy = src.position() - offset;
+ if (availableToCopy < bytesToCopy) {
+ bytesToCopy = availableToCopy;
+ }
+ dst.put(array, offset, bytesToCopy);
+ position += bytesToCopy;
+ if (availableToCopy == 0) {
+ // EOF
+ return -1;
+ }
+ return bytesToCopy;
+ }
+
+ /**
+ * Writing isn't supported.
+ */
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ throw new NonWritableChannelException();
+ }
+
+ /**
+ * Returns this channel's position.
+ *
+ * @return This channel's position,
+ * a non-negative integer counting the number of bytes
+ * from the beginning of the entity to the current position
+ * @throws ClosedChannelException If this channel is closed
+ * @throws IOException If some other I/O error occurs
+ */
+ @Override
+ public long position() throws IOException {
+ if (!open) {
+ throw new ClosedChannelException();
+ }
+ return position;
+ }
+
+ /**
+ * Sets this channel's position.
+ *
+ * Setting the position to a value that is greater than the current size
+ * is legal but does not change the size of the entity. A later attempt to
+ * read bytes at such a position will immediately return an end-of-file
+ * indication. A later attempt to write bytes at such a position will cause
+ * the entity to grow to accommodate the new bytes; the values of any bytes
+ * between the previous end-of-file and the newly-written bytes are
+ * unspecified.
+ *
+ *
Setting the channel's position is not recommended when connected to
+ * an entity, typically a file, that is opened with the
+ * StandardOpenOption.APPEND option. When opened for
+ * append, the position is first advanced to the end before writing.
+ *
+ * @param newPosition The new position, a non-negative integer counting
+ * the number of bytes from the beginning of the entity
+ * @return This channel
+ * @throws ClosedChannelException If this channel is closed
+ * @throws IllegalArgumentException If the new position is negative
+ * @throws IOException If some other I/O error occurs
+ */
+ @Override
+ public SeekableByteChannel position(long newPosition) throws IOException {
+ if (!open) {
+ throw new ClosedChannelException();
+ }
+ position = newPosition;
+ return this;
+ }
+
+ /**
+ * Returns the current size of entity to which this channel is connected.
+ *
+ * @return The current size, measured in bytes
+ * @throws ClosedChannelException If this channel is closed
+ * @throws IOException If some other I/O error occurs
+ */
+ @Override
+ public long size() throws IOException {
+ if (!open) {
+ throw new ClosedChannelException();
+ }
+ return size;
+ }
+
+ /**
+ * Not supported.
+ */
+ @Override
+ public SeekableByteChannel truncate(long size) throws IOException {
+ throw new NonWritableChannelException();
+ }
+
+ /**
+ * Tells whether or not this channel is open.
+ *
+ * @return true if, and only if, this channel is open
+ */
+ @Override
+ public boolean isOpen() {
+ return open;
+ }
+
+ /**
+ * Closes this channel.
+ *
+ *
After a channel is closed, any further attempt to invoke I/O
+ * operations upon it will cause a {@link ClosedChannelException} to be
+ * thrown.
+ *
+ *
If this channel is already closed then invoking this method has no
+ * effect.
+ *
+ *
This method may be invoked at any time. If some other thread has
+ * already invoked it, however, then another invocation will block until
+ * the first invocation is complete, after which it will return without
+ * effect.
+ *
+ * @throws IOException If an I/O error occurs
+ */
+ @Override
+ public void close() throws IOException {
+ if (open) {
+ closing = true;
+ exec.shutdown();
+ try {
+ while (true) {
+ synchronized (idleWorkers) {
+ if (idleWorkers.size() >= workerCount) {
+ // every thread is idle, we're done.
+ break;
+ }
+ idleWorkers.wait(60_000);
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ try {
+ exec.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ exec.shutdownNow();
+ }
+ // Close all underlying channels
+ for (Worker w : idleWorkers) {
+ w.close();
+ }
+ open = false;
+ }
+ }
+
+ // Return a buffer at this position, blocking if necessary.
+ // Start a background read of the buffer after this one (if there isn't one already).
+ public ByteBuffer fetch(long position) throws InterruptedException, ExecutionException {
+ long index = position / bufferSize;
+ Buffer buf = buffers.get(index);
+ if (null != buf) {
+ return buf.promise.get();
+ }
+ // we don't have data. Point a fetching thread at it.
+ Buffer newBuf = getEmptyBuffer();
+ Worker newWorker = getIdleWorker();
+ sicWorker(newWorker, position, newBuf);
+ startPrefetching(position);
+ // now we wait.
+ return newBuf.promise.get();
+ }
+
+ private Worker getIdleWorker() throws InterruptedException {
+ while (true) {
+ synchronized (idleWorkers) {
+ while (idleWorkers.isEmpty()) {
+ idleWorkers.wait();
+ }
+ if (!idleWorkers.isEmpty()) {
+ return idleWorkers.remove(0);
+ }
+ }
+ }
+ }
+
+ private Worker tryGetIdleWorker() throws InterruptedException {
+ while (true) {
+ synchronized (idleWorkers) {
+ if (closing) {
+ return null;
+ }
+ if (!idleWorkers.isEmpty()) {
+ return idleWorkers.remove(0);
+ }
+ return null;
+ }
+ }
+ }
+
+ private void sicWorker(Worker worker, long pos, Buffer toFill) throws ExecutionException, InterruptedException {
+ long bucketStart = beginningOfBucket(pos);
+ worker.init(bucketStart, toFill);
+ Future promise = exec.submit(worker);
+ toFill.promise = promise;
+ buffers.put(index(bucketStart), toFill);
+ }
+
+ private long index(long pos) {
+ return pos / bufferSize;
+ }
+
+ private long beginningOfBucket(long pos) {
+ return index(pos) * bufferSize;
+ }
+
+ // Return the oldest buffer, or (if we haven't yet allocated all our buffers)
+ // a newly-allocated buffer.
+ private Buffer getEmptyBuffer() {
+ if (buffers.size() < bufferCount) {
+ return new Buffer(bufferSize);
+ }
+ Buffer candidate = buffers.getOldest();
+ return candidate;
+ }
+
+ // Worker is idle. Do we have work for it?
+ private void reassignWorker(Worker w) throws ExecutionException, InterruptedException {
+ long lastIndex = index(size);
+ long curIndex = index(position);
+ if (!closing) {
+ for (int i = 0; i < prefetchingThreads; i++) {
+ if (i > lastIndex) {
+ break;
+ }
+ if (buffers.get(curIndex + i) == null) {
+ // work for you!
+ Buffer buf = getEmptyBuffer();
+ sicWorker(w, bufferSize * (curIndex + i), buf);
+ return;
+ }
+ }
+ }
+ // nothing to do, return to idle pool
+ synchronized (idleWorkers) {
+ idleWorkers.add(w);
+ idleWorkers.notifyAll();
+ }
+ }
+
+ private void startPrefetching(long position) throws ExecutionException, InterruptedException {
+ if (closing) {
+ return;
+ }
+ long lastIndex = index(size);
+ long curIndex = index(position);
+ for (int i = 0; i < prefetchingThreads; i++) {
+ if (i > lastIndex) {
+ break;
+ }
+ if (buffers.get(curIndex + i) == null) {
+ // work available!
+ Worker w = tryGetIdleWorker();
+ if (null == w) {
+ break;
+ }
+ Buffer buf = getEmptyBuffer();
+ sicWorker(w, bufferSize * (curIndex + i), buf);
+ }
+ }
+ }
+}
diff --git a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcherOptions.java b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcherOptions.java
new file mode 100644
index 000000000000..eadb4c64a716
--- /dev/null
+++ b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcherOptions.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2016 Google Inc. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage.contrib.nio;
+
+/**
+ * Options for the SeekableByteChannelPrefetcher. Mutating them after creating
+ * the SeekableByteChannelPrefetcher has no effect.
+ */
+public class SeekableByteChannelPrefetcherOptions implements java.nio.file.OpenOption {
+ // normal-case number of parallel reads.
+ public int prefetchingThreads = 2;
+ // in case the data we need isn't being prefetched, we can use up to this many
+ // extra threads to fetch user-requested data.
+ public int extraThreads = 1;
+ // size in bytes for our buffer. Every fetcher grabs one buffer at a time.
+ public int bufferSize = 50 * 1024 * 1024;
+ // how many buffers we keep around. Should be at least prefetchingThreads + extraThreads.
+ // bufferSize * bufferCount is how much memory this class'll allocate.
+ public int bufferCount = 4;
+}
diff --git a/google-cloud-contrib/google-cloud-nio/src/test/java/com/google/cloud/storage/contrib/nio/it/ITGcsNio.java b/google-cloud-contrib/google-cloud-nio/src/test/java/com/google/cloud/storage/contrib/nio/it/ITGcsNio.java
index 602b2d7ff03f..0075272cf6fb 100644
--- a/google-cloud-contrib/google-cloud-nio/src/test/java/com/google/cloud/storage/contrib/nio/it/ITGcsNio.java
+++ b/google-cloud-contrib/google-cloud-nio/src/test/java/com/google/cloud/storage/contrib/nio/it/ITGcsNio.java
@@ -21,6 +21,7 @@
import com.google.cloud.storage.contrib.nio.CloudStorageConfiguration;
import com.google.cloud.storage.contrib.nio.CloudStorageFileSystem;
+import com.google.cloud.storage.contrib.nio.SeekableByteChannelPrefetcherOptions;
import com.google.common.collect.ImmutableList;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.BucketInfo;
@@ -140,10 +141,24 @@ public void testFileSize() throws IOException {
@Test(timeout = 60_000)
public void testReadByteChannel() throws IOException {
+ innerTestReadByteChannel(false);
+ }
+
+ @Test(timeout = 60_000)
+ public void testReadByteChannelWithPrefetch() throws IOException {
+ innerTestReadByteChannel(true);
+ }
+
+ private void innerTestReadByteChannel(boolean prefetch) throws IOException {
CloudStorageFileSystem testBucket = getTestBucket();
Path path = testBucket.getPath(SML_FILE);
long size = Files.size(path);
- SeekableByteChannel chan = Files.newByteChannel(path, StandardOpenOption.READ);
+ SeekableByteChannel chan;
+ if (prefetch) {
+ chan = Files.newByteChannel(path, StandardOpenOption.READ, new SeekableByteChannelPrefetcherOptions());
+ } else {
+ chan = Files.newByteChannel(path, StandardOpenOption.READ);
+ }
assertThat(chan.size()).isEqualTo(size);
ByteBuffer buf = ByteBuffer.allocate(SML_SIZE);
int read = 0;
@@ -162,10 +177,20 @@ public void testReadByteChannel() throws IOException {
byte[] expected = new byte[SML_SIZE];
new Random(SML_SIZE).nextBytes(expected);
assertThat(Arrays.equals(buf.array(), expected)).isTrue();
+ chan.close();
}
@Test
public void testSeek() throws IOException {
+ innerTestSeek(false);
+ }
+
+ @Test
+ public void testSeekWithPrefetch() throws IOException {
+ innerTestSeek(true);
+ }
+
+ private void innerTestSeek(boolean prefetch) throws IOException {
CloudStorageFileSystem testBucket = getTestBucket();
Path path = testBucket.getPath(BIG_FILE);
int size = BIG_SIZE;
@@ -173,7 +198,12 @@ public void testSeek() throws IOException {
byte[] sample = new byte[100];
byte[] wanted;
byte[] wanted2;
- SeekableByteChannel chan = Files.newByteChannel(path, StandardOpenOption.READ);
+ SeekableByteChannel chan;
+ if (prefetch) {
+ chan = Files.newByteChannel(path, StandardOpenOption.READ, new SeekableByteChannelPrefetcherOptions());
+ } else {
+ chan = Files.newByteChannel(path, StandardOpenOption.READ);
+ }
assertThat(chan.size()).isEqualTo(size);
// check seek
@@ -191,6 +221,7 @@ public void testSeek() throws IOException {
// if the two spots in the file have the same contents, then this isn't a good file for this
// test.
assertThat(wanted).isNotEqualTo(wanted2);
+ chan.close();
}
@Test
diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/nio/CountBytes.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/nio/CountBytes.java
index 755f74f294a9..9df3bb94ff6e 100644
--- a/google-cloud-examples/src/main/java/com/google/cloud/examples/nio/CountBytes.java
+++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/nio/CountBytes.java
@@ -16,6 +16,7 @@
package com.google.cloud.examples.nio;
+import com.google.cloud.storage.contrib.nio.SeekableByteChannelPrefetcherOptions;
import com.google.common.base.Stopwatch;
import com.google.common.io.BaseEncoding;
@@ -24,6 +25,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
+import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.MessageDigest;
@@ -52,18 +54,29 @@ public static void main(String[] args) throws IOException {
help();
return;
}
+ boolean prefetch = false;
for (String a : args) {
- countFile(a);
+ if ("--prefetch".equals(a)) {
+ prefetch = true;
+ continue;
+ }
+ countFile(a, prefetch);
}
}
/**
* Print the length of the indicated file.
*
+ * Prefetching here is just an example; in this scenario there isn't that much to be gained.
+ *
+ *
Prefetching is most useful in a scenario where the caller issues many small sequential reads
+ * and does some computation in between. Prefetching results in fewer larger reads that are
+ * issued in parallel with the computation.
+ *
*
This uses the normal Java NIO Api, so it can take advantage of any installed
* NIO Filesystem provider without any extra effort.
*/
- private static void countFile(String fname) {
+ private static void countFile(String fname, boolean prefetch) {
// large buffers pay off
final int bufSize = 50 * 1024 * 1024;
try {
@@ -71,9 +84,17 @@ private static void countFile(String fname) {
long size = Files.size(path);
System.out.println(fname + ": " + size + " bytes.");
ByteBuffer buf = ByteBuffer.allocate(bufSize);
- System.out.println("Reading the whole file...");
+ System.out.println("Reading the whole file... (prefetch: " + prefetch + ")");
Stopwatch sw = Stopwatch.createStarted();
- try (SeekableByteChannel chan = Files.newByteChannel(path)) {
+ OpenOption[] options = new OpenOption[0];
+ if (prefetch) {
+ final SeekableByteChannelPrefetcherOptions opts = new SeekableByteChannelPrefetcherOptions();
+ // Configure for perfectly sequential access.
+ opts.extraThreads = 0;
+ opts.bufferCount = opts.prefetchingThreads;
+ options = new OpenOption[]{opts};
+ }
+ try (SeekableByteChannel chan = Files.newByteChannel(path, options)) {
long total = 0;
int readCalls = 0;
MessageDigest md = MessageDigest.getInstance("MD5");