From dc572624fde2a7dab715e78ebc8b5c063922fb05 Mon Sep 17 00:00:00 2001 From: JP Martin Date: Fri, 18 Nov 2016 15:54:51 -0800 Subject: [PATCH 1/6] Multithreaded prefetcher option for newReadChannel Benefits: - the caller can run computation in between read calls, and data is being fetched in parallel with that computation. - the caller can issue many small sequential reads, and the system instead sends larger reads, which is more efficient. --- .../nio/CloudStorageFileSystemProvider.java | 16 +- .../nio/SeekableByteChannelPrefetcher.java | 437 ++++++++++++++++++ .../SeekableByteChannelPrefetcherOptions.java | 34 ++ .../storage/contrib/nio/it/ITGcsNio.java | 33 +- .../google/cloud/examples/nio/CountBytes.java | 29 +- 5 files changed, 542 insertions(+), 7 deletions(-) create mode 100644 google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcher.java create mode 100644 google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcherOptions.java diff --git a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileSystemProvider.java b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileSystemProvider.java index 727327d215fd..2eee54ed5f54 100644 --- a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileSystemProvider.java +++ b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileSystemProvider.java @@ -226,8 +226,13 @@ public SeekableByteChannel newByteChannel( private SeekableByteChannel newReadChannel(Path path, Set options) throws IOException { initStorage(); + // null indicates we shouldn't add a prefetcher + SeekableByteChannelPrefetcherOptions prefetcherOptions = null; + for (OpenOption option : options) { - if (option instanceof StandardOpenOption) { + if (option instanceof SeekableByteChannelPrefetcherOptions) { + prefetcherOptions = (SeekableByteChannelPrefetcherOptions)option; + } else if (option instanceof StandardOpenOption) { switch ((StandardOpenOption) option) { case READ: // Default behavior. @@ -255,6 +260,15 @@ private SeekableByteChannel newReadChannel(Path path, Set if (cloudPath.seemsLikeADirectoryAndUsePseudoDirectories()) { throw new CloudStoragePseudoDirectoryException(cloudPath); } + if (prefetcherOptions!=null) { + // layer a prefetcher on top, using the provided options + List chans = new ArrayList<>(); + for (int i=0; i idleWorkers; + + private class Buffer { + // index*bufferSize = file position. Set to -1 when we haven't yet decided. + public long index; + public Future promise; + + public Buffer(int size) { + ByteBuffer bb = ByteBuffer.allocate(size); + this.index = -1; + this.promise = Futures.immediateFuture(bb); + } + } + + // Holds all of the buffers. They are either being filled, or already full. + // A thin wrapper around TreeMap, so we can change away from the + // red-black tree if it's too heavy for this scale. + private class Sorted { + TreeMap data; + Queue byAge; + + public Sorted(int bufferCount) { + data = new TreeMap<>(); + byAge = new ArrayBlockingQueue(bufferCount+1); + } + + public void put(long index, Buffer item) { + item.index = index; + data.put(index, item); + byAge.add(item); + } + public Buffer get(long index) { + return data.get(index); + } + public Buffer getOldest() { + Buffer oldest = byAge.remove(); + data.remove(oldest.index); + oldest.index = -1; + return oldest; + } + + public int size() { + return byAge.size(); + } + } + + private class Worker implements Callable, Closeable { + ByteBuffer bb = null; + long pos; + SeekableByteChannel chan; + + public Worker(SeekableByteChannel chan) { + this.chan = chan; + } + + public void init(long pos, Buffer buf) throws ExecutionException, InterruptedException { + if (!buf.promise.isDone()) { + throw new RuntimeException("Cannot download onto buffer that's still being filled"); + } + ByteBuffer bb = buf.promise.get(); + bb.clear(); + buf.index = pos / bufferSize; + this.bb = bb; + this.pos = pos; + } + + public ByteBuffer call() throws IOException, ExecutionException, InterruptedException { + if (pos > chan.size()) { + return null; + } + chan.position(pos); + ByteBuffer b = this.bb; + // read until buffer is full, or EOF + while (chan.read(b) > 0 && !closing) {} + reassignWorker(this); + return b; + } + + public void close() throws IOException { + chan.close(); + } + } + + public SeekableByteChannelPrefetcher(SeekableByteChannelPrefetcherOptions opts, Iterable channels) throws IOException { + this.bufferCount = opts.bufferCount; + this.bufferSize = opts.bufferSize; + buffers = new Sorted(bufferCount); + this.prefetchingThreads = opts.prefetchingThreads; + this.extraThreads = opts.extraThreads; + this.idleWorkers = new ArrayList<>(this.prefetchingThreads + this.extraThreads); + this.exec = Executors.newFixedThreadPool(prefetchingThreads + extraThreads); + SeekableByteChannel chan = null; + for (SeekableByteChannel bc : channels) { + chan = bc; + idleWorkers.add(new Worker(bc)); + } + size = chan.size(); + } + + /** + * Reads a sequence of bytes from this channel into the given buffer. + * + * @param dst + */ + @Override + public int read(ByteBuffer dst) throws IOException { + if (!open) throw new ClosedChannelException(); + ByteBuffer src; + try { + src = fetch(position); + } catch (InterruptedException e) { + return 0; + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + int bytesToCopy = dst.remaining(); + byte[] array = src.array(); + // src.position is how far we've written into the array. + // This should always be all the way to the end, unless we've hit EOF. + long blockIndex = position / bufferSize; + // |----blocksize----|----blocksize----| + // <-->| + // | ^ position + // ^ offset + int offset = (int)(position - (blockIndex * bufferSize)); + int availableToCopy = src.position() - offset; + if (availableToCopy < bytesToCopy) { + bytesToCopy = availableToCopy; + } + dst.put(array, offset, bytesToCopy); + position += bytesToCopy; + if (availableToCopy == 0) { + // EOF + return -1; + } + return bytesToCopy; + } + + /** + * Writing isn't supported. + */ + @Override + public int write(ByteBuffer src) throws IOException { + throw new NonWritableChannelException(); + } + + /** + * Returns this channel's position. + * + * @return This channel's position, + * a non-negative integer counting the number of bytes + * from the beginning of the entity to the current position + * @throws ClosedChannelException If this channel is closed + * @throws IOException If some other I/O error occurs + */ + @Override + public long position() throws IOException { + if (!open) throw new ClosedChannelException(); + return position; + } + + /** + * Sets this channel's position. + *

+ *

Setting the position to a value that is greater than the current size + * is legal but does not change the size of the entity. A later attempt to + * read bytes at such a position will immediately return an end-of-file + * indication. A later attempt to write bytes at such a position will cause + * the entity to grow to accommodate the new bytes; the values of any bytes + * between the previous end-of-file and the newly-written bytes are + * unspecified. + *

+ *

Setting the channel's position is not recommended when connected to + * an entity, typically a file, that is opened with the {@link + * StandardOpenOption#APPEND APPEND} option. When opened for + * append, the position is first advanced to the end before writing. + * + * @param newPosition The new position, a non-negative integer counting + * the number of bytes from the beginning of the entity + * @return This channel + * @throws ClosedChannelException If this channel is closed + * @throws IllegalArgumentException If the new position is negative + * @throws IOException If some other I/O error occurs + */ + @Override + public SeekableByteChannel position(long newPosition) throws IOException { + if (!open) throw new ClosedChannelException(); + position = newPosition; + return this; + } + + /** + * Returns the current size of entity to which this channel is connected. + * + * @return The current size, measured in bytes + * @throws ClosedChannelException If this channel is closed + * @throws IOException If some other I/O error occurs + */ + @Override + public long size() throws IOException { + if (!open) throw new ClosedChannelException(); + return size; + } + + /** + * Not supported. + */ + @Override + public SeekableByteChannel truncate(long size) throws IOException { + throw new NonWritableChannelException(); + } + + /** + * Tells whether or not this channel is open. + * + * @return true if, and only if, this channel is open + */ + @Override + public boolean isOpen() { + return open; + } + + /** + * Closes this channel. + *

+ *

After a channel is closed, any further attempt to invoke I/O + * operations upon it will cause a {@link ClosedChannelException} to be + * thrown. + *

+ *

If this channel is already closed then invoking this method has no + * effect. + *

+ *

This method may be invoked at any time. If some other thread has + * already invoked it, however, then another invocation will block until + * the first invocation is complete, after which it will return without + * effect.

+ * + * @throws IOException If an I/O error occurs + */ + @Override + public void close() throws IOException { + if (open) { + // TODO: quiet everything, close all channels. + exec.shutdown(); + try { + exec.awaitTermination(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + exec.shutdownNow(); + } + open = false; + } + } + + // Return a buffer at this position, blocking if necessary. + // Start a background read of the buffer after this one (if there isn't one already). + public ByteBuffer fetch(long position) throws InterruptedException, ExecutionException { + long index = position / bufferSize; + Buffer buf = buffers.get(index); + if (null != buf) { + return buf.promise.get(); + } + // we don't have data. Point a fetching thread at it. + Buffer newBuf = getEmptyBuffer(); + Worker newWorker = getIdleWorker(); + sicWorker(newWorker, position, newBuf); + startPrefetching(position); + // now we wait. + return newBuf.promise.get(); + } + + private Worker getIdleWorker() throws InterruptedException { + while (true) { + synchronized (idleWorkers) { + while (idleWorkers.size() == 0) { + idleWorkers.wait(); + } + if (idleWorkers.size() > 0) { + return idleWorkers.remove(0); + } + } + } + } + + private Worker tryGetIdleWorker() throws InterruptedException { + while (true) { + synchronized (idleWorkers) { + if (idleWorkers.size() > 0) { + return idleWorkers.remove(0); + } + return null; + } + } + } + + private void sicWorker(Worker worker, long pos, Buffer toFill) throws ExecutionException, InterruptedException { + pos = beginningOfBucket(pos); + worker.init(pos, toFill); + Future promise = exec.submit(worker); + toFill.promise = promise; + buffers.put(index(pos), toFill); + } + + private long index(long pos) { + return pos / bufferSize; + } + + private long beginningOfBucket(long pos) { + return index(pos) * bufferSize; + } + + // Return the oldest buffer, or (if we haven't yet allocated all our buffers) + // a newly-allocated buffer. + private Buffer getEmptyBuffer() { + if (buffers.size() < bufferCount) { + return new Buffer(bufferSize); + } + Buffer candidate = buffers.getOldest(); + return candidate; + } + + // Worker is idle. Do we have work for it? + private void reassignWorker(Worker w) throws ExecutionException, InterruptedException { + long lastIndex = index(size); + long curIndex = index(position); + if (!closing) { + for (int i = 0; i < prefetchingThreads; i++) { + if (i > lastIndex) break; + if (buffers.get(curIndex + i) == null) { + // work for you! + Buffer buf = getEmptyBuffer(); + sicWorker(w, bufferSize * (curIndex + i), buf); + return; + } + } + } + // nothing to do, return to idle pool + synchronized (idleWorkers) { + idleWorkers.add(w); + idleWorkers.notify(); + } + } + + private void startPrefetching(long position) throws ExecutionException, InterruptedException { + if (closing) { + return; + } + long lastIndex = index(size); + long curIndex = index(position); + for (int i = 0; i < prefetchingThreads; i++) { + if (i > lastIndex) break; + if (buffers.get(curIndex + i) == null) { + // work available! + Worker w = tryGetIdleWorker(); + if (null == w) { + break; + } + Buffer buf = getEmptyBuffer(); + sicWorker(w, bufferSize * (curIndex + i), buf); + return; + } + } + } +} diff --git a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcherOptions.java b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcherOptions.java new file mode 100644 index 000000000000..d9c1dc71f375 --- /dev/null +++ b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcherOptions.java @@ -0,0 +1,34 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage.contrib.nio; + +/** + * Options for the SeekableByteChannelPrefetcher. Mutating them after creating + * the SeekableByteChannelPrefetcher has no effect. + */ +public class SeekableByteChannelPrefetcherOptions implements java.nio.file.OpenOption { + // normal-case number of parallel reads. + public int prefetchingThreads = 4; + // in case the data we need isn't being prefetched, we can use up to this many + // extra threads to fetch user-requested data. + public int extraThreads = 1; + // size in bytes for our buffer. Every fetcher grabs one buffer at a time. + public int bufferSize = 50 * 1024 * 1024; + // how many buffers we keep around. Should be at least prefetchingThreads + extraThreads. + // bufferSize * bufferCount is how much memory this class'll allocate. + public int bufferCount = 6; +} diff --git a/google-cloud-contrib/google-cloud-nio/src/test/java/com/google/cloud/storage/contrib/nio/it/ITGcsNio.java b/google-cloud-contrib/google-cloud-nio/src/test/java/com/google/cloud/storage/contrib/nio/it/ITGcsNio.java index 602b2d7ff03f..bedcb5d01097 100644 --- a/google-cloud-contrib/google-cloud-nio/src/test/java/com/google/cloud/storage/contrib/nio/it/ITGcsNio.java +++ b/google-cloud-contrib/google-cloud-nio/src/test/java/com/google/cloud/storage/contrib/nio/it/ITGcsNio.java @@ -21,6 +21,7 @@ import com.google.cloud.storage.contrib.nio.CloudStorageConfiguration; import com.google.cloud.storage.contrib.nio.CloudStorageFileSystem; +import com.google.cloud.storage.contrib.nio.SeekableByteChannelPrefetcherOptions; import com.google.common.collect.ImmutableList; import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.BucketInfo; @@ -140,10 +141,24 @@ public void testFileSize() throws IOException { @Test(timeout = 60_000) public void testReadByteChannel() throws IOException { + innerTestReadByteChannel(false); + } + + @Test(timeout = 60_000) + public void testReadByteChannelWithPrefetch() throws IOException { + innerTestReadByteChannel(true); + } + + private void innerTestReadByteChannel(boolean prefetch) throws IOException { CloudStorageFileSystem testBucket = getTestBucket(); Path path = testBucket.getPath(SML_FILE); long size = Files.size(path); - SeekableByteChannel chan = Files.newByteChannel(path, StandardOpenOption.READ); + SeekableByteChannel chan; + if (prefetch) { + chan = Files.newByteChannel(path, StandardOpenOption.READ, new SeekableByteChannelPrefetcherOptions()); + } else { + chan = Files.newByteChannel(path, StandardOpenOption.READ); + } assertThat(chan.size()).isEqualTo(size); ByteBuffer buf = ByteBuffer.allocate(SML_SIZE); int read = 0; @@ -166,6 +181,15 @@ public void testReadByteChannel() throws IOException { @Test public void testSeek() throws IOException { + innerTestSeek(false); + } + + @Test + public void testSeekWithPrefetch() throws IOException { + innerTestSeek(true); + } + + private void innerTestSeek(boolean prefetch) throws IOException { CloudStorageFileSystem testBucket = getTestBucket(); Path path = testBucket.getPath(BIG_FILE); int size = BIG_SIZE; @@ -173,7 +197,12 @@ public void testSeek() throws IOException { byte[] sample = new byte[100]; byte[] wanted; byte[] wanted2; - SeekableByteChannel chan = Files.newByteChannel(path, StandardOpenOption.READ); + SeekableByteChannel chan; + if (prefetch) { + chan = Files.newByteChannel(path, StandardOpenOption.READ, new SeekableByteChannelPrefetcherOptions()); + } else { + chan = Files.newByteChannel(path, StandardOpenOption.READ); + } assertThat(chan.size()).isEqualTo(size); // check seek diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/nio/CountBytes.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/nio/CountBytes.java index 755f74f294a9..9df3bb94ff6e 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/nio/CountBytes.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/nio/CountBytes.java @@ -16,6 +16,7 @@ package com.google.cloud.examples.nio; +import com.google.cloud.storage.contrib.nio.SeekableByteChannelPrefetcherOptions; import com.google.common.base.Stopwatch; import com.google.common.io.BaseEncoding; @@ -24,6 +25,7 @@ import java.nio.ByteBuffer; import java.nio.channels.SeekableByteChannel; import java.nio.file.Files; +import java.nio.file.OpenOption; import java.nio.file.Path; import java.nio.file.Paths; import java.security.MessageDigest; @@ -52,18 +54,29 @@ public static void main(String[] args) throws IOException { help(); return; } + boolean prefetch = false; for (String a : args) { - countFile(a); + if ("--prefetch".equals(a)) { + prefetch = true; + continue; + } + countFile(a, prefetch); } } /** * Print the length of the indicated file. * + *

Prefetching here is just an example; in this scenario there isn't that much to be gained. + * + *

Prefetching is most useful in a scenario where the caller issues many small sequential reads + * and does some computation in between. Prefetching results in fewer larger reads that are + * issued in parallel with the computation. + * *

This uses the normal Java NIO Api, so it can take advantage of any installed * NIO Filesystem provider without any extra effort. */ - private static void countFile(String fname) { + private static void countFile(String fname, boolean prefetch) { // large buffers pay off final int bufSize = 50 * 1024 * 1024; try { @@ -71,9 +84,17 @@ private static void countFile(String fname) { long size = Files.size(path); System.out.println(fname + ": " + size + " bytes."); ByteBuffer buf = ByteBuffer.allocate(bufSize); - System.out.println("Reading the whole file..."); + System.out.println("Reading the whole file... (prefetch: " + prefetch + ")"); Stopwatch sw = Stopwatch.createStarted(); - try (SeekableByteChannel chan = Files.newByteChannel(path)) { + OpenOption[] options = new OpenOption[0]; + if (prefetch) { + final SeekableByteChannelPrefetcherOptions opts = new SeekableByteChannelPrefetcherOptions(); + // Configure for perfectly sequential access. + opts.extraThreads = 0; + opts.bufferCount = opts.prefetchingThreads; + options = new OpenOption[]{opts}; + } + try (SeekableByteChannel chan = Files.newByteChannel(path, options)) { long total = 0; int readCalls = 0; MessageDigest md = MessageDigest.getInstance("MD5"); From d22d4cfa5467f4b52af8930342664c6e7c21a791 Mon Sep 17 00:00:00 2001 From: JP Martin Date: Fri, 18 Nov 2016 16:33:18 -0800 Subject: [PATCH 2/6] Channel closing code --- .../nio/SeekableByteChannelPrefetcher.java | 47 +++++++++++++------ .../storage/contrib/nio/it/ITGcsNio.java | 2 + 2 files changed, 35 insertions(+), 14 deletions(-) diff --git a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcher.java b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcher.java index 45453078819a..b23b39aaf741 100644 --- a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcher.java +++ b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcher.java @@ -16,7 +16,6 @@ package com.google.cloud.storage.contrib.nio; -import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.Futures; import java.io.Closeable; @@ -65,7 +64,7 @@ public class SeekableByteChannelPrefetcher implements SeekableByteChannel { // size of the underlying channel(s). private final long size; // where we pretend to be, wrt returning bytes from read() - private long position = 0; + private long position; // whether we're open. private boolean open = true; private boolean closing = false; @@ -169,6 +168,7 @@ public SeekableByteChannelPrefetcher(SeekableByteChannelPrefetcherOptions opts, idleWorkers.add(new Worker(bc)); } size = chan.size(); + position = 0; } /** @@ -178,7 +178,9 @@ public SeekableByteChannelPrefetcher(SeekableByteChannelPrefetcherOptions opts, */ @Override public int read(ByteBuffer dst) throws IOException { - if (!open) throw new ClosedChannelException(); + if (!open) { + throw new ClosedChannelException(); + } ByteBuffer src; try { src = fetch(position); @@ -229,7 +231,9 @@ public int write(ByteBuffer src) throws IOException { */ @Override public long position() throws IOException { - if (!open) throw new ClosedChannelException(); + if (!open) { + throw new ClosedChannelException(); + } return position; } @@ -258,7 +262,9 @@ public long position() throws IOException { */ @Override public SeekableByteChannel position(long newPosition) throws IOException { - if (!open) throw new ClosedChannelException(); + if (!open) { + throw new ClosedChannelException(); + } position = newPosition; return this; } @@ -272,7 +278,9 @@ public SeekableByteChannel position(long newPosition) throws IOException { */ @Override public long size() throws IOException { - if (!open) throw new ClosedChannelException(); + if (!open) { + throw new ClosedChannelException(); + } return size; } @@ -314,7 +322,15 @@ public boolean isOpen() { @Override public void close() throws IOException { if (open) { - // TODO: quiet everything, close all channels. + closing = true; + while (true) { + synchronized (idleWorkers) { + if (idleWorkers.size() == prefetchingThreads + extraThreads) { + // every thread is idle, we're done. + break; + } + } + } exec.shutdown(); try { exec.awaitTermination(60, TimeUnit.SECONDS); @@ -345,10 +361,10 @@ public ByteBuffer fetch(long position) throws InterruptedException, ExecutionExc private Worker getIdleWorker() throws InterruptedException { while (true) { synchronized (idleWorkers) { - while (idleWorkers.size() == 0) { + while (idleWorkers.isEmpty()) { idleWorkers.wait(); } - if (idleWorkers.size() > 0) { + if (!idleWorkers.isEmpty()) { return idleWorkers.remove(0); } } @@ -358,7 +374,10 @@ private Worker getIdleWorker() throws InterruptedException { private Worker tryGetIdleWorker() throws InterruptedException { while (true) { synchronized (idleWorkers) { - if (idleWorkers.size() > 0) { + if (closing) { + return null; + } + if (!idleWorkers.isEmpty()) { return idleWorkers.remove(0); } return null; @@ -367,11 +386,11 @@ private Worker tryGetIdleWorker() throws InterruptedException { } private void sicWorker(Worker worker, long pos, Buffer toFill) throws ExecutionException, InterruptedException { - pos = beginningOfBucket(pos); - worker.init(pos, toFill); + long bucketStart = beginningOfBucket(pos); + worker.init(bucketStart, toFill); Future promise = exec.submit(worker); toFill.promise = promise; - buffers.put(index(pos), toFill); + buffers.put(index(bucketStart), toFill); } private long index(long pos) { @@ -410,7 +429,7 @@ private void reassignWorker(Worker w) throws ExecutionException, InterruptedExce // nothing to do, return to idle pool synchronized (idleWorkers) { idleWorkers.add(w); - idleWorkers.notify(); + idleWorkers.notifyAll(); } } diff --git a/google-cloud-contrib/google-cloud-nio/src/test/java/com/google/cloud/storage/contrib/nio/it/ITGcsNio.java b/google-cloud-contrib/google-cloud-nio/src/test/java/com/google/cloud/storage/contrib/nio/it/ITGcsNio.java index bedcb5d01097..0075272cf6fb 100644 --- a/google-cloud-contrib/google-cloud-nio/src/test/java/com/google/cloud/storage/contrib/nio/it/ITGcsNio.java +++ b/google-cloud-contrib/google-cloud-nio/src/test/java/com/google/cloud/storage/contrib/nio/it/ITGcsNio.java @@ -177,6 +177,7 @@ private void innerTestReadByteChannel(boolean prefetch) throws IOException { byte[] expected = new byte[SML_SIZE]; new Random(SML_SIZE).nextBytes(expected); assertThat(Arrays.equals(buf.array(), expected)).isTrue(); + chan.close(); } @Test @@ -220,6 +221,7 @@ private void innerTestSeek(boolean prefetch) throws IOException { // if the two spots in the file have the same contents, then this isn't a good file for this // test. assertThat(wanted).isNotEqualTo(wanted2); + chan.close(); } @Test From 00e0ab8df8d05726e57a2a031302ea8971b48b92 Mon Sep 17 00:00:00 2001 From: JP Martin Date: Fri, 18 Nov 2016 17:33:49 -0800 Subject: [PATCH 3/6] Closing now actually works --- .../nio/SeekableByteChannelPrefetcher.java | 31 ++++++++++++++----- .../SeekableByteChannelPrefetcherOptions.java | 4 +-- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcher.java b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcher.java index b23b39aaf741..b497123d2db4 100644 --- a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcher.java +++ b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcher.java @@ -16,6 +16,7 @@ package com.google.cloud.storage.contrib.nio; +import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Futures; import java.io.Closeable; @@ -72,6 +73,7 @@ public class SeekableByteChannelPrefetcher implements SeekableByteChannel { private final ExecutorService exec; private final Sorted buffers; private final ArrayList idleWorkers; + private final int workerCount; private class Buffer { // index*bufferSize = file position. Set to -1 when we haven't yet decided. @@ -139,6 +141,7 @@ public void init(long pos, Buffer buf) throws ExecutionException, InterruptedExc public ByteBuffer call() throws IOException, ExecutionException, InterruptedException { if (pos > chan.size()) { + reassignWorker(this); return null; } chan.position(pos); @@ -163,10 +166,12 @@ public SeekableByteChannelPrefetcher(SeekableByteChannelPrefetcherOptions opts, this.idleWorkers = new ArrayList<>(this.prefetchingThreads + this.extraThreads); this.exec = Executors.newFixedThreadPool(prefetchingThreads + extraThreads); SeekableByteChannel chan = null; - for (SeekableByteChannel bc : channels) { + ImmutableList underlyingChannels = ImmutableList.copyOf(channels); + for (SeekableByteChannel bc : underlyingChannels) { chan = bc; idleWorkers.add(new Worker(bc)); } + workerCount = underlyingChannels.size(); size = chan.size(); position = 0; } @@ -323,20 +328,27 @@ public boolean isOpen() { public void close() throws IOException { if (open) { closing = true; - while (true) { - synchronized (idleWorkers) { - if (idleWorkers.size() == prefetchingThreads + extraThreads) { + exec.shutdown(); + try { + while (true) synchronized (idleWorkers) { + if (idleWorkers.size() >= workerCount) { // every thread is idle, we're done. break; } + idleWorkers.wait(60_000); } + } catch (InterruptedException e) { + System.out.println("Timed out while waiting for channels to close."); } - exec.shutdown(); try { exec.awaitTermination(60, TimeUnit.SECONDS); } catch (InterruptedException e) { exec.shutdownNow(); } + // Close all underlying channels + for (Worker w : idleWorkers) { + w.close(); + } open = false; } } @@ -417,7 +429,9 @@ private void reassignWorker(Worker w) throws ExecutionException, InterruptedExce long curIndex = index(position); if (!closing) { for (int i = 0; i < prefetchingThreads; i++) { - if (i > lastIndex) break; + if (i > lastIndex) { + break; + } if (buffers.get(curIndex + i) == null) { // work for you! Buffer buf = getEmptyBuffer(); @@ -440,7 +454,9 @@ private void startPrefetching(long position) throws ExecutionException, Interrup long lastIndex = index(size); long curIndex = index(position); for (int i = 0; i < prefetchingThreads; i++) { - if (i > lastIndex) break; + if (i > lastIndex) { + break; + } if (buffers.get(curIndex + i) == null) { // work available! Worker w = tryGetIdleWorker(); @@ -449,7 +465,6 @@ private void startPrefetching(long position) throws ExecutionException, Interrup } Buffer buf = getEmptyBuffer(); sicWorker(w, bufferSize * (curIndex + i), buf); - return; } } } diff --git a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcherOptions.java b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcherOptions.java index d9c1dc71f375..eadb4c64a716 100644 --- a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcherOptions.java +++ b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcherOptions.java @@ -22,7 +22,7 @@ */ public class SeekableByteChannelPrefetcherOptions implements java.nio.file.OpenOption { // normal-case number of parallel reads. - public int prefetchingThreads = 4; + public int prefetchingThreads = 2; // in case the data we need isn't being prefetched, we can use up to this many // extra threads to fetch user-requested data. public int extraThreads = 1; @@ -30,5 +30,5 @@ public class SeekableByteChannelPrefetcherOptions implements java.nio.file.OpenO public int bufferSize = 50 * 1024 * 1024; // how many buffers we keep around. Should be at least prefetchingThreads + extraThreads. // bufferSize * bufferCount is how much memory this class'll allocate. - public int bufferCount = 6; + public int bufferCount = 4; } From 0c1c208321461a143f8302573bf41f2aaaf3efc3 Mon Sep 17 00:00:00 2001 From: JP Martin Date: Mon, 28 Nov 2016 09:21:20 -0800 Subject: [PATCH 4/6] Add javadoc to CloudStorageConfiguration --- .../nio/CloudStorageConfiguration.java | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageConfiguration.java b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageConfiguration.java index efa086d7db80..113081599ddd 100644 --- a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageConfiguration.java +++ b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageConfiguration.java @@ -31,18 +31,18 @@ public abstract class CloudStorageConfiguration { public static final CloudStorageConfiguration DEFAULT = builder().build(); /** - * Returns path of current working directory. This defaults to the root directory. + * @return path of current working directory. This defaults to the root directory. */ public abstract String workingDirectory(); /** - * Returns {@code true} if we shouldn't throw an exception when encountering object names + * @return {@code true} if we shouldn't throw an exception when encountering object names * containing superfluous slashes, e.g. {@code a//b}. */ public abstract boolean permitEmptyPathComponents(); /** - * Returns {@code true} if '/' prefix on absolute object names should be removed before I/O. + * @return {@code true} if '/' prefix on absolute object names should be removed before I/O. * *

If you disable this feature, please take into consideration that all paths created from a * URI will have the leading slash. @@ -50,12 +50,12 @@ public abstract class CloudStorageConfiguration { public abstract boolean stripPrefixSlash(); /** - * Returns {@code true} if paths with a trailing slash should be treated as fake directories. + * @return {@code true} if paths with a trailing slash should be treated as fake directories. */ public abstract boolean usePseudoDirectories(); /** - * Returns block size (in bytes) used when talking to the Google Cloud Storage HTTP server. + * @return block size (in bytes) used when talking to the Google Cloud Storage HTTP server. */ public abstract int blockSize(); @@ -67,6 +67,8 @@ public abstract class CloudStorageConfiguration { *

  • The prefix slash on absolute paths will be removed when converting to an object name. *
  • Pseudo-directories are enabled, so any path with a trailing slash is a fake directory. * + * + * @return builder */ public static Builder builder() { return new Builder(); @@ -89,6 +91,7 @@ public static final class Builder { * {@link CloudStorageFileSystem} object. * * @throws IllegalArgumentException if {@code path} is not absolute. + * @return builder */ public Builder workingDirectory(String path) { checkArgument(UnixPath.getPath(false, path).isAbsolute(), "not absolute: %s", path); @@ -99,6 +102,8 @@ public Builder workingDirectory(String path) { /** * Configures whether or not we should throw an exception when encountering object names * containing superfluous slashes, e.g. {@code a//b}. + * + * @return builder */ public Builder permitEmptyPathComponents(boolean value) { permitEmptyPathComponents = value; @@ -110,6 +115,9 @@ public Builder permitEmptyPathComponents(boolean value) { * *

    If you disable this feature, please take into consideration that all paths created from a * URI will have the leading slash. + * + * @parm value if true, remove the '/' prefix on absolute object names + * @return builder */ public Builder stripPrefixSlash(boolean value) { stripPrefixSlash = value; @@ -118,6 +126,9 @@ public Builder stripPrefixSlash(boolean value) { /** * Configures if paths with a trailing slash should be treated as fake directories. + * + * @parm value whether paths with a trailing slash should be treated as fake directories. + * @return builder */ public Builder usePseudoDirectories(boolean value) { usePseudoDirectories = value; @@ -128,6 +139,9 @@ public Builder usePseudoDirectories(boolean value) { * Sets the block size in bytes that should be used for each HTTP request to the API. * *

    The default is {@value CloudStorageFileSystem#BLOCK_SIZE_DEFAULT}. + * + * @parm value block size in bytes that should be used for each HTTP request to the API. + * @return builder */ public Builder blockSize(int value) { blockSize = value; @@ -136,6 +150,8 @@ public Builder blockSize(int value) { /** * Creates new instance without destroying builder. + * + * @return CloudStorageConfiguration with the parameters you asked for. */ public CloudStorageConfiguration build() { return new AutoValue_CloudStorageConfiguration( From a5e418db632a8a787ce4d15a8023a66b16ed409a Mon Sep 17 00:00:00 2001 From: JP Martin Date: Wed, 30 Nov 2016 15:08:20 -0800 Subject: [PATCH 5/6] cosmetic changes --- .../nio/CloudStorageConfiguration.java | 8 ++-- .../nio/CloudStorageFileAttributes.java | 14 +++---- .../contrib/nio/CloudStorageFileSystem.java | 19 ++++++--- .../nio/CloudStorageFileSystemProvider.java | 4 +- .../contrib/nio/CloudStorageOptions.java | 23 +++++++++++ .../nio/SeekableByteChannelPrefetcher.java | 39 ++++++++++--------- 6 files changed, 73 insertions(+), 34 deletions(-) diff --git a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageConfiguration.java b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageConfiguration.java index 113081599ddd..1a48020905e6 100644 --- a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageConfiguration.java +++ b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageConfiguration.java @@ -91,6 +91,7 @@ public static final class Builder { * {@link CloudStorageFileSystem} object. * * @throws IllegalArgumentException if {@code path} is not absolute. + * @param path the new current working directory * @return builder */ public Builder workingDirectory(String path) { @@ -103,6 +104,7 @@ public Builder workingDirectory(String path) { * Configures whether or not we should throw an exception when encountering object names * containing superfluous slashes, e.g. {@code a//b}. * + * @param value whether to permit empty path components (will throw if false) * @return builder */ public Builder permitEmptyPathComponents(boolean value) { @@ -116,7 +118,7 @@ public Builder permitEmptyPathComponents(boolean value) { *

    If you disable this feature, please take into consideration that all paths created from a * URI will have the leading slash. * - * @parm value if true, remove the '/' prefix on absolute object names + * @param value if true, remove the '/' prefix on absolute object names * @return builder */ public Builder stripPrefixSlash(boolean value) { @@ -127,7 +129,7 @@ public Builder stripPrefixSlash(boolean value) { /** * Configures if paths with a trailing slash should be treated as fake directories. * - * @parm value whether paths with a trailing slash should be treated as fake directories. + * @param value whether paths with a trailing slash should be treated as fake directories. * @return builder */ public Builder usePseudoDirectories(boolean value) { @@ -140,7 +142,7 @@ public Builder usePseudoDirectories(boolean value) { * *

    The default is {@value CloudStorageFileSystem#BLOCK_SIZE_DEFAULT}. * - * @parm value block size in bytes that should be used for each HTTP request to the API. + * @param value block size in bytes that should be used for each HTTP request to the API. * @return builder */ public Builder blockSize(int value) { diff --git a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileAttributes.java b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileAttributes.java index ff77f0b96ab6..369f9ece864b 100644 --- a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileAttributes.java +++ b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileAttributes.java @@ -29,49 +29,49 @@ public interface CloudStorageFileAttributes extends BasicFileAttributes { /** - * Returns HTTP etag hash of object contents. + * @return HTTP etag hash of object contents. * * @see "https://developers.google.com/storage/docs/hashes-etags" */ Optional etag(); /** - * Returns mime type (e.g. text/plain), if set. + * @return mime type (e.g. text/plain), if set. * * @see "http://en.wikipedia.org/wiki/Internet_media_type#List_of_common_media_types" */ Optional mimeType(); /** - * Returns access control list. + * @return access control list. * * @see "https://developers.google.com/storage/docs/reference-headers#acl" */ Optional> acl(); /** - * Returns {@code Cache-Control} HTTP header value, if set. + * @return {@code Cache-Control} HTTP header value, if set. * * @see "https://developers.google.com/storage/docs/reference-headers#cachecontrol" */ Optional cacheControl(); /** - * Returns {@code Content-Encoding} HTTP header value, if set. + * @return {@code Content-Encoding} HTTP header value, if set. * * @see "https://developers.google.com/storage/docs/reference-headers#contentencoding" */ Optional contentEncoding(); /** - * Returns {@code Content-Disposition} HTTP header value, if set. + * @return {@code Content-Disposition} HTTP header value, if set. * * @see "https://developers.google.com/storage/docs/reference-headers#contentdisposition" */ Optional contentDisposition(); /** - * Returns user-specified metadata. + * @return user-specified metadata. * * @see "https://developers.google.com/storage/docs/reference-headers#contentdisposition" */ diff --git a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileSystem.java b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileSystem.java index 60a39fb5a817..f2d83849e025 100644 --- a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileSystem.java +++ b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileSystem.java @@ -84,6 +84,8 @@ public static CloudStorageFileSystem forBucket(String bucket) { /** * Creates new file system instance for {@code bucket}, with customizable settings. * + * @param bucket the bucket to access + * @param config configuration * @see #forBucket(String) */ @CheckReturnValue @@ -107,6 +109,10 @@ public static CloudStorageFileSystem forBucket(String bucket, CloudStorageConfig * from using that if possible, for the reasons documented in * {@link CloudStorageFileSystemProvider#newFileSystem(URI, java.util.Map)} * + * @param bucket the bucket to access + * @param config configuration + * @param storageOptions storage options + * * @see java.nio.file.FileSystems#getFileSystem(URI) */ @CheckReturnValue @@ -132,14 +138,14 @@ public CloudStorageFileSystemProvider provider() { } /** - * Returns Cloud Storage bucket name being served by this file system. + * @return Cloud Storage bucket name being served by this file system. */ public String bucket() { return bucket; } /** - * Returns configuration object for this file system instance. + * @return configuration object for this file system instance. */ public CloudStorageConfiguration config() { return config; @@ -147,6 +153,9 @@ public CloudStorageConfiguration config() { /** * Converts Cloud Storage object name to a {@link Path} object. + * + * @param first cloud storage object name + * @return Path object */ @Override public CloudStoragePath getPath(String first, String... more) { @@ -168,7 +177,7 @@ public void close() throws IOException { } /** - * Returns {@code true}, even if you previously called the {@link #close()} method. + * @return {@code true}, even if you previously called the {@link #close()} method. */ @Override public boolean isOpen() { @@ -176,7 +185,7 @@ public boolean isOpen() { } /** - * Returns {@code false}. + * @return {@code false}. */ @Override public boolean isReadOnly() { @@ -184,7 +193,7 @@ public boolean isReadOnly() { } /** - * Returns {@value UnixPath#SEPARATOR}. + * @return {@value UnixPath#SEPARATOR}. */ @Override public String getSeparator() { diff --git a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileSystemProvider.java b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileSystemProvider.java index 2eee54ed5f54..33b61a89adbb 100644 --- a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileSystemProvider.java +++ b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageFileSystemProvider.java @@ -121,6 +121,8 @@ protected Path computeNext() { /** * Sets options that are only used by the constructor. + * + * @param newStorageOptions options that are only used by the constructor */ @VisibleForTesting public static void setStorageOptions(StorageOptions newStorageOptions) { @@ -170,7 +172,7 @@ public CloudStorageFileSystem getFileSystem(URI uri) { } /** - * Returns Cloud Storage file system, provided a URI with no path, e.g. {@code gs://bucket}. + * @return Cloud Storage file system, provided a URI with no path, e.g. {@code gs://bucket}. * * @param uri bucket and current working directory, e.g. {@code gs://bucket} * @param env map of configuration options, whose keys correspond to the method names of diff --git a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageOptions.java b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageOptions.java index 74293b7a79f2..c7a22a1a0618 100644 --- a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageOptions.java +++ b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/CloudStorageOptions.java @@ -25,6 +25,9 @@ public final class CloudStorageOptions { /** * Sets the mime type header on an object, e.g. {@code "text/plain"}. + * + * @param mimeType MIME type + * @return the corresponding option */ public static CloudStorageOption.OpenCopy withMimeType(String mimeType) { return OptionMimeType.create(mimeType); @@ -32,6 +35,7 @@ public static CloudStorageOption.OpenCopy withMimeType(String mimeType) { /** * Disables caching on an object. Same as: {@code withCacheControl("no-cache")}. + * @return the corresponding option */ public static CloudStorageOption.OpenCopy withoutCaching() { return withCacheControl("no-cache"); @@ -41,6 +45,9 @@ public static CloudStorageOption.OpenCopy withoutCaching() { * Sets the {@code Cache-Control} HTTP header on an object. * * @see "https://developers.google.com/storage/docs/reference-headers#cachecontrol" + * + * @param cacheControl Cache-Control HTTP header + * @return the corresponding option */ public static CloudStorageOption.OpenCopy withCacheControl(String cacheControl) { return OptionCacheControl.create(cacheControl); @@ -50,6 +57,9 @@ public static CloudStorageOption.OpenCopy withCacheControl(String cacheControl) * Sets the {@code Content-Disposition} HTTP header on an object. * * @see "https://developers.google.com/storage/docs/reference-headers#contentdisposition" + * + * @param contentDisposition Content-Disposition HTTP header + * @return the corresponding option */ public static CloudStorageOption.OpenCopy withContentDisposition(String contentDisposition) { return OptionContentDisposition.create(contentDisposition); @@ -59,6 +69,9 @@ public static CloudStorageOption.OpenCopy withContentDisposition(String contentD * Sets the {@code Content-Encoding} HTTP header on an object. * * @see "https://developers.google.com/storage/docs/reference-headers#contentencoding" + * + * @param contentEncoding content encoding + * @return the corresponding option */ public static CloudStorageOption.OpenCopy withContentEncoding(String contentEncoding) { return OptionContentEncoding.create(contentEncoding); @@ -68,6 +81,9 @@ public static CloudStorageOption.OpenCopy withContentEncoding(String contentEnco * Sets the ACL value on a Cloud Storage object. * * @see "https://developers.google.com/storage/docs/reference-headers#acl" + * + * @param acl ACL value + * @return the corresponding option */ public static CloudStorageOption.OpenCopy withAcl(Acl acl) { return OptionAcl.create(acl); @@ -77,6 +93,10 @@ public static CloudStorageOption.OpenCopy withAcl(Acl acl) { * Sets an unmodifiable piece of user metadata on a Cloud Storage object. * * @see "https://developers.google.com/storage/docs/reference-headers#xgoogmeta" + * + * @param key metadata key + * @param value metadata value + * @return the corresponding option */ public static CloudStorageOption.OpenCopy withUserMetadata(String key, String value) { return OptionUserMetadata.create(key, value); @@ -86,6 +106,9 @@ public static CloudStorageOption.OpenCopy withUserMetadata(String key, String va * Sets the block size (in bytes) when talking to the Google Cloud Storage server. * *

    The default is {@value CloudStorageFileSystem#BLOCK_SIZE_DEFAULT}. + * + * @param size block size (in bytes) + * @return the corresponding option */ public static CloudStorageOption.OpenCopy withBlockSize(int size) { return OptionBlockSize.create(size); diff --git a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcher.java b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcher.java index b497123d2db4..882fcbce63b1 100644 --- a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcher.java +++ b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcher.java @@ -44,7 +44,7 @@ * (Of course this is only worthwhile if the underlying SeekableByteChannel doesn't already * implement prefetching). * - * The prefetcher can read with multiple threads in parallel, and it can keep a buffered + *

    The prefetcher can read with multiple threads in parallel, and it can keep a buffered * copy of already-read bytes just in case the caller doesn't follow a strictly linear pattern. * It can also optionally use an extra thread in case of out-of-order reads to fetch the data at * once even if all prefetching threads are busy. @@ -68,7 +68,7 @@ public class SeekableByteChannelPrefetcher implements SeekableByteChannel { private long position; // whether we're open. private boolean open = true; - private boolean closing = false; + private boolean closing; private final ExecutorService exec; private final Sorted buffers; @@ -120,7 +120,7 @@ public int size() { } private class Worker implements Callable, Closeable { - ByteBuffer bb = null; + ByteBuffer bb; long pos; SeekableByteChannel chan; @@ -146,8 +146,9 @@ public ByteBuffer call() throws IOException, ExecutionException, InterruptedExce } chan.position(pos); ByteBuffer b = this.bb; - // read until buffer is full, or EOF - while (chan.read(b) > 0 && !closing) {} + while (chan.read(b) > 0 && !closing) { + // read until buffer is full, or EOF + } reassignWorker(this); return b; } @@ -179,7 +180,7 @@ public SeekableByteChannelPrefetcher(SeekableByteChannelPrefetcherOptions opts, /** * Reads a sequence of bytes from this channel into the given buffer. * - * @param dst + * @param dst destination buffer */ @Override public int read(ByteBuffer dst) throws IOException { @@ -244,7 +245,7 @@ public long position() throws IOException { /** * Sets this channel's position. - *

    + * *

    Setting the position to a value that is greater than the current size * is legal but does not change the size of the entity. A later attempt to * read bytes at such a position will immediately return an end-of-file @@ -252,10 +253,10 @@ public long position() throws IOException { * the entity to grow to accommodate the new bytes; the values of any bytes * between the previous end-of-file and the newly-written bytes are * unspecified. - *

    + * *

    Setting the channel's position is not recommended when connected to - * an entity, typically a file, that is opened with the {@link - * StandardOpenOption#APPEND APPEND} option. When opened for + * an entity, typically a file, that is opened with the + * StandardOpenOption.APPEND option. When opened for * append, the position is first advanced to the end before writing. * * @param newPosition The new position, a non-negative integer counting @@ -309,14 +310,14 @@ public boolean isOpen() { /** * Closes this channel. - *

    + * *

    After a channel is closed, any further attempt to invoke I/O * operations upon it will cause a {@link ClosedChannelException} to be * thrown. - *

    + * *

    If this channel is already closed then invoking this method has no * effect. - *

    + * *

    This method may be invoked at any time. If some other thread has * already invoked it, however, then another invocation will block until * the first invocation is complete, after which it will return without @@ -330,12 +331,14 @@ public void close() throws IOException { closing = true; exec.shutdown(); try { - while (true) synchronized (idleWorkers) { - if (idleWorkers.size() >= workerCount) { - // every thread is idle, we're done. - break; + while (true) { + synchronized (idleWorkers) { + if (idleWorkers.size() >= workerCount) { + // every thread is idle, we're done. + break; + } + idleWorkers.wait(60_000); } - idleWorkers.wait(60_000); } } catch (InterruptedException e) { System.out.println("Timed out while waiting for channels to close."); From c54a4f659f6e8b4a98df4dd77a4e21820c2f191d Mon Sep 17 00:00:00 2001 From: JP Martin Date: Thu, 1 Dec 2016 12:47:17 -0800 Subject: [PATCH 6/6] Handle InterruptedException --- .../storage/contrib/nio/SeekableByteChannelPrefetcher.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcher.java b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcher.java index 882fcbce63b1..6f77dd1d8307 100644 --- a/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcher.java +++ b/google-cloud-contrib/google-cloud-nio/src/main/java/com/google/cloud/storage/contrib/nio/SeekableByteChannelPrefetcher.java @@ -191,6 +191,7 @@ public int read(ByteBuffer dst) throws IOException { try { src = fetch(position); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); return 0; } catch (ExecutionException e) { throw new RuntimeException(e); @@ -341,11 +342,12 @@ public void close() throws IOException { } } } catch (InterruptedException e) { - System.out.println("Timed out while waiting for channels to close."); + Thread.currentThread().interrupt(); } try { exec.awaitTermination(60, TimeUnit.SECONDS); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); exec.shutdownNow(); } // Close all underlying channels