diff --git a/processing/src/main/java/org/apache/druid/error/DruidException.java b/processing/src/main/java/org/apache/druid/error/DruidException.java index b1056bbef52a..66190d13a91f 100644 --- a/processing/src/main/java/org/apache/druid/error/DruidException.java +++ b/processing/src/main/java/org/apache/druid/error/DruidException.java @@ -170,7 +170,7 @@ public static DruidExceptionBuilder defensive() /** * Build a "defensive" exception, this is an exception that should never actually be triggered, but we are - * throwing it inside of a defensive check. + * throwing it inside a defensive check. * * @return A builder for a defensive exception. */ diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java index e41a362fda8e..0d0a7a40555d 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java @@ -21,8 +21,10 @@ import com.google.common.io.ByteStreams; import org.apache.druid.io.Channels; +import org.apache.druid.java.util.common.ByteBufferUtils; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IOE; +import org.apache.druid.java.util.common.io.Closer; import java.io.File; import java.io.FileInputStream; @@ -33,22 +35,28 @@ import java.nio.channels.FileChannel; import java.nio.channels.WritableByteChannel; -final class FileWriteOutBytes extends WriteOutBytes +public final class FileWriteOutBytes extends WriteOutBytes { private final File file; private final FileChannel ch; private long writeOutBytes; - /** Purposely big-endian, for {@link #writeInt(int)} implementation */ - private final ByteBuffer buffer = ByteBuffer.allocate(4096); // 4K page sized buffer + /** + * Purposely big-endian, for {@link #writeInt(int)} implementation. + * Direct because there is a material difference in performance when writing direct buffers + */ + private final ByteBuffer buffer = ByteBuffer.allocateDirect(32768); // 32K page sized buffer - FileWriteOutBytes(File file, FileChannel ch) + FileWriteOutBytes(File file, FileChannel ch, Closer closer) { this.file = file; this.ch = ch; this.writeOutBytes = 0L; + closer.register( + () -> ByteBufferUtils.free(buffer) + ); } - + private void flushIfNeeded(int bytesNeeded) throws IOException { if (buffer.remaining() < bytesNeeded) { @@ -90,22 +98,29 @@ public int write(ByteBuffer src) throws IOException { int len = src.remaining(); flushIfNeeded(len); - while (src.remaining() > buffer.capacity()) { - int srcLimit = src.limit(); - try { - src.limit(src.position() + buffer.capacity()); - buffer.put(src); - writeOutBytes += buffer.capacity(); - flush(); - } - finally { - // IOException may occur in flush(), reset src limit to the original - src.limit(srcLimit); + if (len > buffer.remaining()) { + // if a flush was required, flushIfNeeded should have forced a flush. So, if the len is greater than + // our buffer size, we should just dump it straight to the file instead of buffering + Channels.writeFully(ch, src); + writeOutBytes += len; + } else { + while (src.remaining() > buffer.capacity()) { + int srcLimit = src.limit(); + try { + src.limit(src.position() + buffer.capacity()); + buffer.put(src); + writeOutBytes += buffer.capacity(); + flush(); + } + finally { + // IOException may occur in flush(), reset src limit to the original + src.limit(srcLimit); + } } + int remaining = src.remaining(); + buffer.put(src); + writeOutBytes += remaining; } - int remaining = src.remaining(); - buffer.put(src); - writeOutBytes += remaining; return len; } diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytes.java new file mode 100644 index 000000000000..8d3d7bb76125 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytes.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.writeout; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.io.input.NullInputStream; +import org.apache.druid.error.DruidException; +import org.apache.druid.io.ByteBufferInputStream; +import org.apache.druid.java.util.common.io.Closer; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.BufferOverflowException; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.function.Supplier; + +/** + * Lazily decides to use a tmpBuffer to act as WriteOutBytes, till more than certain threshold is reached. Once this is + * met, it switches to delegating all calls to a {@link WriteOutBytes} created by delegateSupplier. + *

+ * This is useful if the data stored in the {@link WriteOutBytes} is small enough that buffering the changes in memory + * would be faster than creating some {@link WriteOutBytes}. + */ +public class LazilyAllocatingHeapWriteOutBytes extends WriteOutBytes +{ + private final Supplier delegateSupplier; + + private ByteBuffer tmpBuffer = null; + private WriteOutBytes delegate = null; + private boolean open = true; + + public LazilyAllocatingHeapWriteOutBytes(Supplier delegateSupplier, Closer closer) + { + this.delegateSupplier = delegateSupplier; + closer.register(() -> { + open = false; + tmpBuffer = null; + delegate = null; + }); + } + + @Override + public void writeInt(int v) throws IOException + { + checkOpen(); + final boolean useBuffer = ensureBytes(Integer.BYTES); + if (useBuffer) { + tmpBuffer.putInt(v); + return; + } + + delegate.writeInt(v); + } + + @Override + public long size() + { + if (delegate == null) { + return tmpBuffer == null ? 0 : tmpBuffer.position(); + } else { + return delegate.size(); + } + } + + @Override + public void writeTo(WritableByteChannel channel) throws IOException + { + checkOpen(); + if (delegate == null) { + if (tmpBuffer == null) { + return; + } + + final ByteBuffer tmpBufCopy = tmpBuffer.asReadOnlyBuffer(); + tmpBufCopy.flip(); + channel.write(tmpBufCopy); + } else { + delegate.writeTo(channel); + } + } + + @Override + public InputStream asInputStream() throws IOException + { + checkOpen(); + if (delegate == null) { + if (tmpBuffer == null) { + return new NullInputStream(); + } + final ByteBuffer tmpBufCopy = tmpBuffer.asReadOnlyBuffer(); + tmpBufCopy.flip(); + return new ByteBufferInputStream(tmpBufCopy); + } else { + return delegate.asInputStream(); + } + } + + @Override + public void readFully(long pos, ByteBuffer buffer) throws IOException + { + checkOpen(); + if (delegate == null) { + if (tmpBuffer == null) { + return; + } + + if (pos <= tmpBuffer.position()) { + final ByteBuffer tmpBufCopy = tmpBuffer.asReadOnlyBuffer(); + tmpBufCopy.flip().position((int) pos); + if (tmpBufCopy.remaining() < buffer.remaining()) { + throw new BufferUnderflowException(); + } + tmpBufCopy.limit(tmpBufCopy.position() + buffer.remaining()); + buffer.put(tmpBufCopy); + } else { + throw new BufferOverflowException(); + } + } else { + delegate.readFully(pos, buffer); + } + } + + @Override + public void write(int b) throws IOException + { + checkOpen(); + final boolean useBuffer = ensureBytes(1); + if (useBuffer) { + tmpBuffer.put((byte) b); + return; + } + + delegate.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException + { + write(ByteBuffer.wrap(b, off, len)); + } + + @Override + public int write(ByteBuffer src) throws IOException + { + checkOpen(); + final int numToWrite = src.remaining(); + final boolean useBuffer = ensureBytes(numToWrite); + if (useBuffer) { + tmpBuffer.put(src); + return numToWrite; + } + + return delegate.write(src); + } + + @Override + public boolean isOpen() + { + return open; + } + + private void checkOpen() + { + if (!isOpen()) { + throw DruidException.defensive("WriteOutBytes is already closed."); + } + } + + /** + * Ensures bytes are available, returns a boolean indicating if the buffer should be used or the delegate + * + * @param numBytes the number of bytes that need writing + * @return boolean true if the buffer should be used, false if the delegate should be used + * @throws IOException if an issue with IO occurs (can primarily happen when copying buffers) + */ + private boolean ensureBytes(int numBytes) throws IOException + { + if (tmpBuffer == null) { + if (delegate == null) { + if (numBytes < 128) { + tmpBuffer = ByteBuffer.allocate(128); + } else if (numBytes < 4096) { + tmpBuffer = ByteBuffer.allocate(4096); + } else { + // We are likely dealing with something that's already buffering stuff, so just switch delegate immediately + delegate = delegateSupplier.get(); + return false; + } + return true; + } else { + return false; + } + } + + if (numBytes < tmpBuffer.remaining()) { + return true; + } + + final ByteBuffer newBuf; + switch (tmpBuffer.capacity()) { + case 128: + if (numBytes < 4096 - tmpBuffer.position()) { + newBuf = ByteBuffer.allocate(4096); + break; + } + case 4096: + if (numBytes < 16384 - tmpBuffer.position()) { + newBuf = ByteBuffer.allocate(16384); + break; + } + default: + newBuf = null; + } + + if (newBuf == null) { + delegate = delegateSupplier.get(); + tmpBuffer.flip(); + delegate.write(tmpBuffer); + tmpBuffer = null; + return false; + } else { + tmpBuffer.flip(); + newBuf.put(tmpBuffer); + tmpBuffer = newBuf; + return true; + } + } + + @VisibleForTesting + ByteBuffer getTmpBuffer() + { + return tmpBuffer; + } + + @VisibleForTesting + WriteOutBytes getDelegate() + { + return delegate; + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/LegacyFileWriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/LegacyFileWriteOutBytes.java new file mode 100644 index 000000000000..192aa9b61c3f --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/writeout/LegacyFileWriteOutBytes.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.writeout; + +import com.google.common.io.ByteStreams; +import org.apache.druid.io.Channels; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.IOE; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.WritableByteChannel; + +final class LegacyFileWriteOutBytes extends WriteOutBytes +{ + private final File file; + private final FileChannel ch; + private long writeOutBytes; + + /** Purposely big-endian, for {@link #writeInt(int)} implementation */ + private final ByteBuffer buffer = ByteBuffer.allocate(4096); // 4K page sized buffer + + LegacyFileWriteOutBytes(File file, FileChannel ch) + { + this.file = file; + this.ch = ch; + this.writeOutBytes = 0L; + } + + private void flushIfNeeded(int bytesNeeded) throws IOException + { + if (buffer.remaining() < bytesNeeded) { + flush(); + } + } + + @Override + public void flush() throws IOException + { + buffer.flip(); + try { + Channels.writeFully(ch, buffer); + } + catch (IOException e) { + throw new IOE(e, "Failed to write to file: %s. Current size of file: %d", file.getAbsolutePath(), writeOutBytes); + } + buffer.clear(); + } + + @Override + public void write(int b) throws IOException + { + flushIfNeeded(1); + buffer.put((byte) b); + writeOutBytes++; + } + + @Override + public void writeInt(int v) throws IOException + { + flushIfNeeded(Integer.BYTES); + buffer.putInt(v); + writeOutBytes += Integer.BYTES; + } + + @Override + public int write(ByteBuffer src) throws IOException + { + int len = src.remaining(); + flushIfNeeded(len); + while (src.remaining() > buffer.capacity()) { + int srcLimit = src.limit(); + try { + src.limit(src.position() + buffer.capacity()); + buffer.put(src); + writeOutBytes += buffer.capacity(); + flush(); + } + finally { + // IOException may occur in flush(), reset src limit to the original + src.limit(srcLimit); + } + } + int remaining = src.remaining(); + buffer.put(src); + writeOutBytes += remaining; + return len; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException + { + write(ByteBuffer.wrap(b, off, len)); + } + + @Override + public long size() + { + return writeOutBytes; + } + + @Override + public void writeTo(WritableByteChannel channel) throws IOException + { + flush(); + ch.position(0); + try { + ByteStreams.copy(ch, channel); + } + finally { + ch.position(ch.size()); + } + } + + @Override + public void readFully(long pos, ByteBuffer buffer) throws IOException + { + if (pos < 0 || pos > writeOutBytes) { + throw new IAE("pos %d out of range [%d, %d]", pos, 0, writeOutBytes); + } + flush(); + ch.read(buffer, pos); + if (buffer.remaining() > 0) { + throw new BufferUnderflowException(); + } + } + + @Override + public InputStream asInputStream() throws IOException + { + flush(); + return new FileInputStream(file); + } + + @Override + public boolean isOpen() + { + return ch.isOpen(); + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/LegacyTmpFileSegmentWriteOutMedium.java b/processing/src/main/java/org/apache/druid/segment/writeout/LegacyTmpFileSegmentWriteOutMedium.java new file mode 100644 index 000000000000..a01280bf18df --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/writeout/LegacyTmpFileSegmentWriteOutMedium.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.writeout; + +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.io.Closer; + +import java.io.File; +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.file.StandardOpenOption; + +public final class LegacyTmpFileSegmentWriteOutMedium implements SegmentWriteOutMedium +{ + private final File dir; + private final Closer closer = Closer.create(); + + LegacyTmpFileSegmentWriteOutMedium(File outDir) throws IOException + { + File tmpOutputFilesDir = new File(outDir, "tmpOutputFiles"); + FileUtils.mkdirp(tmpOutputFilesDir); + closer.register(() -> FileUtils.deleteDirectory(tmpOutputFilesDir)); + this.dir = tmpOutputFilesDir; + } + + @Override + public WriteOutBytes makeWriteOutBytes() throws IOException + { + File file = File.createTempFile("filePeon", null, dir); + FileChannel ch = FileChannel.open( + file.toPath(), + StandardOpenOption.READ, + StandardOpenOption.WRITE + ); + closer.register(file::delete); + closer.register(ch); + return new LegacyFileWriteOutBytes(file, ch); + } + + @Override + public SegmentWriteOutMedium makeChildWriteOutMedium() throws IOException + { + LegacyTmpFileSegmentWriteOutMedium tmpFileSegmentWriteOutMedium = new LegacyTmpFileSegmentWriteOutMedium(dir); + closer.register(tmpFileSegmentWriteOutMedium); + return tmpFileSegmentWriteOutMedium; + } + + @Override + public Closer getCloser() + { + return closer; + } + + @Override + public void close() throws IOException + { + closer.close(); + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/LegacyTmpFileSegmentWriteOutMediumFactory.java b/processing/src/main/java/org/apache/druid/segment/writeout/LegacyTmpFileSegmentWriteOutMediumFactory.java new file mode 100644 index 000000000000..c89627401dc9 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/writeout/LegacyTmpFileSegmentWriteOutMediumFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.writeout; + +import com.fasterxml.jackson.annotation.JsonCreator; + +import java.io.File; +import java.io.IOException; + +public final class LegacyTmpFileSegmentWriteOutMediumFactory implements SegmentWriteOutMediumFactory +{ + private static final LegacyTmpFileSegmentWriteOutMediumFactory INSTANCE = new LegacyTmpFileSegmentWriteOutMediumFactory(); + + @JsonCreator + public static LegacyTmpFileSegmentWriteOutMediumFactory instance() + { + return INSTANCE; + } + + private LegacyTmpFileSegmentWriteOutMediumFactory() + { + } + + @Override + public SegmentWriteOutMedium makeSegmentWriteOutMedium(File outDir) throws IOException + { + return new LegacyTmpFileSegmentWriteOutMedium(outDir); + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/SegmentWriteOutMedium.java b/processing/src/main/java/org/apache/druid/segment/writeout/SegmentWriteOutMedium.java index 7c2af89b1b66..16efd07cb1bf 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/SegmentWriteOutMedium.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/SegmentWriteOutMedium.java @@ -28,7 +28,7 @@ * SegmentWriteOutMedium is an umbrella "resource disposer" for temporary buffers (in the form of {@link WriteOutBytes}, * obtained by calling {@link #makeWriteOutBytes()} on the SegmentWriteOutMedium instance), that are used during new Druid * segment creation, and other resources (see {@link #getCloser()}). - * + *

* When SegmentWriteOutMedium is closed, all child WriteOutBytes couldn't be used anymore. */ public interface SegmentWriteOutMedium extends Closeable @@ -37,6 +37,7 @@ public interface SegmentWriteOutMedium extends Closeable * Creates a new empty {@link WriteOutBytes}, attached to this SegmentWriteOutMedium. When this SegmentWriteOutMedium is * closed, the returned WriteOutBytes couldn't be used anymore. */ + @SuppressWarnings("RedundantThrows") WriteOutBytes makeWriteOutBytes() throws IOException; /** @@ -45,7 +46,7 @@ public interface SegmentWriteOutMedium extends Closeable * using a shared {@link SegmentWriteOutMedium} but which control the complete lifecycle of the {@link WriteOutBytes} * which they require to free the backing resources when they are finished, rather than waiting until * {@link #close()} is called for this medium. - * + *

* The 'child' medium will be closed when {@link #close()} is called, if not called explicitly prior to closing this * medium. */ diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/SegmentWriteOutMediumFactory.java b/processing/src/main/java/org/apache/druid/segment/writeout/SegmentWriteOutMediumFactory.java index ae241b3187be..3e34af17f397 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/SegmentWriteOutMediumFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/SegmentWriteOutMediumFactory.java @@ -30,6 +30,7 @@ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = TmpFileSegmentWriteOutMediumFactory.class) @JsonSubTypes(value = { @JsonSubTypes.Type(name = "tmpFile", value = TmpFileSegmentWriteOutMediumFactory.class), + @JsonSubTypes.Type(name = "legacyTmpFile", value = LegacyTmpFileSegmentWriteOutMediumFactory.class), @JsonSubTypes.Type(name = "offHeapMemory", value = OffHeapMemorySegmentWriteOutMediumFactory.class), @JsonSubTypes.Type(name = "onHeapMemory", value = OnHeapMemorySegmentWriteOutMediumFactory.class), }) @@ -39,6 +40,7 @@ static Set builtInFactories() { return ImmutableSet.of( TmpFileSegmentWriteOutMediumFactory.instance(), + LegacyTmpFileSegmentWriteOutMediumFactory.instance(), OffHeapMemorySegmentWriteOutMediumFactory.instance(), OnHeapMemorySegmentWriteOutMediumFactory.instance() ); diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java b/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java index 007b93aa04b7..58dc99198e3f 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java @@ -19,21 +19,51 @@ package org.apache.druid.segment.writeout; +import com.google.common.annotations.VisibleForTesting; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.logger.Logger; import java.io.File; import java.io.IOException; import java.nio.channels.FileChannel; import java.nio.file.StandardOpenOption; +import java.util.Map; +import java.util.SortedMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicInteger; +/** + * Builds segment write out medium objects that are based on temporary files. Some analysis of usage of these + * objects shows that they periodically get used for very small things, where the overhead of creating a tmp file + * is actually very large. It would be best to go back and look at usages and try to make them lazy such that + * they only actually use a medium when they need it. But, in an attempt to get some benefits, we "shim" in the + * laziness by returning a heap-based WriteOutBytes that only falls back to making a tmp file when it actually + * fills up. + */ public final class TmpFileSegmentWriteOutMedium implements SegmentWriteOutMedium { + private static final Logger log = new Logger(TmpFileSegmentWriteOutMedium.class); + + private final AtomicInteger filesCreated; + private final SortedMap sizeDistribution; + private int numLocallyCreated = 0; + private boolean root = false; + private final File dir; private final Closer closer = Closer.create(); TmpFileSegmentWriteOutMedium(File outDir) throws IOException { + this(outDir, new AtomicInteger(0), new ConcurrentSkipListMap<>()); + root = true; + } + + private TmpFileSegmentWriteOutMedium(File outDir, AtomicInteger filesCreated, SortedMap sizeDistribution) throws IOException + { + this.filesCreated = filesCreated; + this.sizeDistribution = sizeDistribution; File tmpOutputFilesDir = new File(outDir, "tmpOutputFiles"); FileUtils.mkdirp(tmpOutputFilesDir); closer.register(() -> FileUtils.deleteDirectory(tmpOutputFilesDir)); @@ -41,23 +71,44 @@ public final class TmpFileSegmentWriteOutMedium implements SegmentWriteOutMedium } @Override - public WriteOutBytes makeWriteOutBytes() throws IOException + public WriteOutBytes makeWriteOutBytes() { - File file = File.createTempFile("filePeon", null, dir); - FileChannel ch = FileChannel.open( - file.toPath(), - StandardOpenOption.READ, - StandardOpenOption.WRITE + return new LazilyAllocatingHeapWriteOutBytes( + () -> { + final int i = filesCreated.incrementAndGet(); + ++numLocallyCreated; + File file; + FileChannel ch; + try { + file = File.createTempFile("filePeon", null, dir); + ch = FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE); + } + catch (IOException e) { + throw DruidException.defensive(e, "Failed"); + } + if (i % 1000 == 0) { + log.info("Created [%,d] tmp files. [%s]", i, dir); + } + final FileWriteOutBytes retVal = new FileWriteOutBytes(file, ch, closer); + closer.register(file::delete); + closer.register(ch); + closer.register(() -> { + sizeDistribution.compute(retVal.size(), (key, val) -> val == null ? 1 : val + 1); + }); + return retVal; + }, + closer ); - closer.register(file::delete); - closer.register(ch); - return new FileWriteOutBytes(file, ch); } @Override public SegmentWriteOutMedium makeChildWriteOutMedium() throws IOException { - TmpFileSegmentWriteOutMedium tmpFileSegmentWriteOutMedium = new TmpFileSegmentWriteOutMedium(dir); + TmpFileSegmentWriteOutMedium tmpFileSegmentWriteOutMedium = new TmpFileSegmentWriteOutMedium( + dir, + filesCreated, + sizeDistribution + ); closer.register(tmpFileSegmentWriteOutMedium); return tmpFileSegmentWriteOutMedium; } @@ -71,6 +122,28 @@ public Closer getCloser() @Override public void close() throws IOException { + log.debug("Closing, files still open[%,d], filesBeingClosed[%,d], dir[%s]", filesCreated.get(), numLocallyCreated, dir); + filesCreated.set(filesCreated.get() - numLocallyCreated); + numLocallyCreated = 0; closer.close(); + + if (root && log.isDebugEnabled()) { + log.debug("Size distribution of files:"); + for (Map.Entry entry : sizeDistribution.entrySet()) { + log.debug("%,15d => %,15d", entry.getKey(), entry.getValue()); + } + } + } + + @VisibleForTesting + int getFilesCreated() + { + return filesCreated.get(); + } + + @VisibleForTesting + int getNumLocallyCreated() + { + return numLocallyCreated; } } diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java index e1c2972b1485..95ab2f7b01dc 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java @@ -30,7 +30,7 @@ * WritableByteChannel} and {@link #writeInt(int)} append to the sequence. Methods {@link * #writeTo(WritableByteChannel)} and {@link #asInputStream()} allow to write the sequence somewhere else. {@link * #readFully} allows to access the sequence randomly. - * + *

* WriteOutBytes is a resource that is managed by {@link SegmentWriteOutMedium}, so it's own {@link #close()} method * does nothing. However WriteOutBytes should appear closed, i. e. {@link #isOpen()} returns false, after the parental * SegmentWriteOutMedium is closed. @@ -47,6 +47,12 @@ public abstract class WriteOutBytes extends OutputStream implements WritableByte */ public abstract long size(); + @Override + public void write(byte[] b, int off, int len) throws IOException + { + write(ByteBuffer.wrap(b, off, len)); + } + /** * Takes all bytes that are written to this WriteOutBytes so far and writes them into the given channel. */ diff --git a/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java b/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java index da6b3c481a5d..cd39c76c99d7 100644 --- a/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java +++ b/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.writeout; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.io.Closer; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; @@ -41,13 +42,13 @@ public void setUp() { mockFileChannel = EasyMock.mock(FileChannel.class); mockFile = EasyMock.mock(File.class); - fileWriteOutBytes = new FileWriteOutBytes(mockFile, mockFileChannel); + fileWriteOutBytes = new FileWriteOutBytes(mockFile, mockFileChannel, Closer.create()); } @Test - public void write4KiBIntsShouldNotFlush() throws IOException + public void write32KiBIntsShouldNotFlush() throws IOException { - // Write 4KiB of ints and expect the write operation of the file channel will be triggered only once. + // Write 32KiB of ints and expect the write operation of the file channel will be triggered only once. EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) .andAnswer(() -> { ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0]; @@ -56,13 +57,13 @@ public void write4KiBIntsShouldNotFlush() throws IOException return remaining; }).times(1); EasyMock.replay(mockFileChannel); - final int writeBytes = 4096; + final int writeBytes = 32768; final int numOfInt = writeBytes / Integer.BYTES; for (int i = 0; i < numOfInt; i++) { fileWriteOutBytes.writeInt(i); } - // no need to flush up to 4KiB - // the first byte after 4KiB will cause a flush + // no need to flush up to 32KiB + // the first byte after 32KiB will cause a flush fileWriteOutBytes.write(1); EasyMock.verify(mockFileChannel); } @@ -92,14 +93,14 @@ public void writeBufferLargerThanCapacityShouldIncrementSizeCorrectly() throws I return remaining; }).times(1); EasyMock.replay(mockFileChannel); - ByteBuffer src = ByteBuffer.allocate(4096 + 1); + ByteBuffer src = ByteBuffer.allocate(32768 + 1); fileWriteOutBytes.write(src); Assert.assertEquals(src.capacity(), fileWriteOutBytes.size()); EasyMock.verify(mockFileChannel); } @Test - public void writeBufferLargerThanCapacityThrowsIOEInTheMiddleShouldIncrementSizeCorrectly() throws IOException + public void writeBufferLargerThanCapacityWritesDirectlyToFileShouldIncrementSizeCorrectly() throws IOException { EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) .andAnswer(() -> { @@ -108,25 +109,16 @@ public void writeBufferLargerThanCapacityThrowsIOEInTheMiddleShouldIncrementSize buffer.position(remaining); return remaining; }).once(); - EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) - .andThrow(new IOException()) - .once(); EasyMock.replay(mockFileChannel); - ByteBuffer src = ByteBuffer.allocate(4096 * 2 + 1); - try { - fileWriteOutBytes.write(src); - Assert.fail("IOException should have been thrown."); - } - catch (IOException e) { - // The second invocation to flush bytes fails. So the size should count what has already been put successfully - Assert.assertEquals(4096 * 2, fileWriteOutBytes.size()); - } + ByteBuffer src = ByteBuffer.allocate(32768 * 2 + 1); + fileWriteOutBytes.write(src); + Assert.assertEquals(32768 * 2 + 1, fileWriteOutBytes.size()); } @Test public void writeBufferSmallerThanCapacityShouldIncrementSizeCorrectly() throws IOException { - ByteBuffer src = ByteBuffer.allocate(4096); + ByteBuffer src = ByteBuffer.allocate(32768); fileWriteOutBytes.write(src); Assert.assertEquals(src.capacity(), fileWriteOutBytes.size()); } @@ -146,11 +138,11 @@ public void sizeDoesNotFlush() throws IOException @Test public void testReadFullyWorks() throws IOException { - int fileSize = 4096; + int fileSize = 32768; int numOfInt = fileSize / Integer.BYTES; ByteBuffer destination = ByteBuffer.allocate(Integer.BYTES); ByteBuffer underlying = ByteBuffer.allocate(fileSize); - // Write 4KiB of ints and expect the write operation of the file channel will be triggered only once. + // Write 32KiB of ints and expect the write operation of the file channel will be triggered only once. EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) .andAnswer(() -> { ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0]; @@ -181,7 +173,7 @@ public void testReadFullyWorks() throws IOException @Test public void testReadFullyOutOfBoundsDoesnt() throws IOException { - int fileSize = 4096; + int fileSize = 32768; int numOfInt = fileSize / Integer.BYTES; ByteBuffer destination = ByteBuffer.allocate(Integer.BYTES); EasyMock.replay(mockFileChannel); @@ -191,7 +183,7 @@ public void testReadFullyOutOfBoundsDoesnt() throws IOException Assert.assertEquals(fileSize, fileWriteOutBytes.size()); destination.position(0); - Assert.assertThrows(IAE.class, () -> fileWriteOutBytes.readFully(5000, destination)); + Assert.assertThrows(IAE.class, () -> fileWriteOutBytes.readFully(33000, destination)); EasyMock.verify(mockFileChannel); } diff --git a/processing/src/test/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytesTest.java b/processing/src/test/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytesTest.java new file mode 100644 index 000000000000..32a5d09ba007 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytesTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.writeout; + +import org.apache.druid.java.util.common.io.Closer; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class LazilyAllocatingHeapWriteOutBytesTest +{ + private LazilyAllocatingHeapWriteOutBytes target; + private Closer closer; + private HeapByteBufferWriteOutBytes heapByteBufferWriteOutBytes; + + @Before + public void setUp() + { + closer = Closer.create(); + heapByteBufferWriteOutBytes = new HeapByteBufferWriteOutBytes(); + target = new LazilyAllocatingHeapWriteOutBytes( + () -> heapByteBufferWriteOutBytes, + closer + ); + } + + @Test + public void testWritingToBuffer() throws IOException + { + Assert.assertNull(target.getTmpBuffer()); + + target.write(ByteBuffer.allocate(512)); + Assert.assertNotNull(target.getTmpBuffer()); + Assert.assertEquals(4096, target.getTmpBuffer().limit()); + Assert.assertNull(target.getDelegate()); + + target.write(ByteBuffer.allocate(16385)); + Assert.assertNull(target.getTmpBuffer()); + Assert.assertNotNull(target.getDelegate()); + Assert.assertEquals(16385 + 512, target.getDelegate().size()); + } + + @Test + public void testClosingWriteOutBytes() throws IOException + { + Assert.assertNull(target.getTmpBuffer()); + + target.writeInt(5); + Assert.assertNotNull(target.getTmpBuffer()); + Assert.assertEquals(128, target.getTmpBuffer().limit()); + Assert.assertNull(target.getDelegate()); + + closer.close(); + + Assert.assertNull(target.getTmpBuffer()); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/writeout/LegacyFileWriteOutBytesTest.java b/processing/src/test/java/org/apache/druid/segment/writeout/LegacyFileWriteOutBytesTest.java new file mode 100644 index 000000000000..1ab2f5f785bd --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/writeout/LegacyFileWriteOutBytesTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.writeout; + +import org.apache.druid.java.util.common.IAE; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +public class LegacyFileWriteOutBytesTest +{ + private LegacyFileWriteOutBytes fileWriteOutBytes; + private FileChannel mockFileChannel; + private File mockFile; + + @Before + public void setUp() + { + mockFileChannel = EasyMock.mock(FileChannel.class); + mockFile = EasyMock.mock(File.class); + fileWriteOutBytes = new LegacyFileWriteOutBytes(mockFile, mockFileChannel); + } + + @Test + public void write4KiBIntsShouldNotFlush() throws IOException + { + // Write 4KiB of ints and expect the write operation of the file channel will be triggered only once. + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + .andAnswer(() -> { + ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0]; + int remaining = buffer.remaining(); + buffer.position(remaining); + return remaining; + }).times(1); + EasyMock.replay(mockFileChannel); + final int writeBytes = 4096; + final int numOfInt = writeBytes / Integer.BYTES; + for (int i = 0; i < numOfInt; i++) { + fileWriteOutBytes.writeInt(i); + } + // no need to flush up to 4KiB + // the first byte after 4KiB will cause a flush + fileWriteOutBytes.write(1); + EasyMock.verify(mockFileChannel); + } + + @Test + public void writeShouldIncrementSize() throws IOException + { + fileWriteOutBytes.write(1); + Assert.assertEquals(1, fileWriteOutBytes.size()); + } + + @Test + public void writeIntShouldIncrementSize() throws IOException + { + fileWriteOutBytes.writeInt(1); + Assert.assertEquals(4, fileWriteOutBytes.size()); + } + + @Test + public void writeBufferLargerThanCapacityShouldIncrementSizeCorrectly() throws IOException + { + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + .andAnswer(() -> { + ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0]; + int remaining = buffer.remaining(); + buffer.position(remaining); + return remaining; + }).times(1); + EasyMock.replay(mockFileChannel); + ByteBuffer src = ByteBuffer.allocate(4096 + 1); + fileWriteOutBytes.write(src); + Assert.assertEquals(src.capacity(), fileWriteOutBytes.size()); + EasyMock.verify(mockFileChannel); + } + + @Test + public void writeBufferLargerThanCapacityThrowsIOEInTheMiddleShouldIncrementSizeCorrectly() throws IOException + { + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + .andAnswer(() -> { + ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0]; + int remaining = buffer.remaining(); + buffer.position(remaining); + return remaining; + }).once(); + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + .andThrow(new IOException()) + .once(); + EasyMock.replay(mockFileChannel); + ByteBuffer src = ByteBuffer.allocate(4096 * 2 + 1); + try { + fileWriteOutBytes.write(src); + Assert.fail("IOException should have been thrown."); + } + catch (IOException e) { + // The second invocation to flush bytes fails. So the size should count what has already been put successfully + Assert.assertEquals(4096 * 2, fileWriteOutBytes.size()); + } + } + + @Test + public void writeBufferSmallerThanCapacityShouldIncrementSizeCorrectly() throws IOException + { + ByteBuffer src = ByteBuffer.allocate(4096); + fileWriteOutBytes.write(src); + Assert.assertEquals(src.capacity(), fileWriteOutBytes.size()); + } + @Test + public void sizeDoesNotFlush() throws IOException + { + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + .andThrow(new AssertionError("file channel should not have been written to.")); + EasyMock.replay(mockFileChannel); + long size = fileWriteOutBytes.size(); + Assert.assertEquals(0, size); + fileWriteOutBytes.writeInt(10); + size = fileWriteOutBytes.size(); + Assert.assertEquals(4, size); + } + + @Test + public void testReadFullyWorks() throws IOException + { + int fileSize = 4096; + int numOfInt = fileSize / Integer.BYTES; + ByteBuffer destination = ByteBuffer.allocate(Integer.BYTES); + ByteBuffer underlying = ByteBuffer.allocate(fileSize); + // Write 4KiB of ints and expect the write operation of the file channel will be triggered only once. + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + .andAnswer(() -> { + ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0]; + underlying.position(0); + underlying.put(buffer); + return 0; + }).times(1); + EasyMock.expect(mockFileChannel.read(EasyMock.eq(destination), EasyMock.eq(100L * Integer.BYTES))) + .andAnswer(() -> { + ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0]; + long pos = (long) EasyMock.getCurrentArguments()[1]; + buffer.putInt(underlying.getInt((int) pos)); + return Integer.BYTES; + }).times(1); + EasyMock.replay(mockFileChannel); + for (int i = 0; i < numOfInt; i++) { + fileWriteOutBytes.writeInt(i); + } + Assert.assertEquals(underlying.capacity(), fileWriteOutBytes.size()); + + destination.position(0); + fileWriteOutBytes.readFully(100L * Integer.BYTES, destination); + destination.position(0); + Assert.assertEquals(100, destination.getInt()); + EasyMock.verify(mockFileChannel); + } + + @Test + public void testReadFullyOutOfBoundsDoesnt() throws IOException + { + int fileSize = 4096; + int numOfInt = fileSize / Integer.BYTES; + ByteBuffer destination = ByteBuffer.allocate(Integer.BYTES); + EasyMock.replay(mockFileChannel); + for (int i = 0; i < numOfInt; i++) { + fileWriteOutBytes.writeInt(i); + } + Assert.assertEquals(fileSize, fileWriteOutBytes.size()); + + destination.position(0); + Assert.assertThrows(IAE.class, () -> fileWriteOutBytes.readFully(5000, destination)); + EasyMock.verify(mockFileChannel); + } + + @Test + public void testIOExceptionHasFileInfo() throws Exception + { + IOException cause = new IOException("Too many bytes"); + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))).andThrow(cause); + EasyMock.expect(mockFile.getAbsolutePath()).andReturn("/tmp/file"); + EasyMock.replay(mockFileChannel, mockFile); + fileWriteOutBytes.writeInt(10); + fileWriteOutBytes.write(new byte[30]); + IOException actual = Assert.assertThrows(IOException.class, () -> fileWriteOutBytes.flush()); + Assert.assertEquals(String.valueOf(actual.getCause()), actual.getCause(), cause); + Assert.assertEquals(actual.getMessage(), actual.getMessage(), "Failed to write to file: /tmp/file. Current size of file: 34"); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMediumTest.java b/processing/src/test/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMediumTest.java new file mode 100644 index 000000000000..11b1acdda24c --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMediumTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.writeout; + +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +public class TmpFileSegmentWriteOutMediumTest +{ + private volatile TmpFileSegmentWriteOutMedium writeOutMedium; + private ExecutorService executorService; + + @Before + public void setUp() throws IOException + { + writeOutMedium = new TmpFileSegmentWriteOutMedium(FileUtils.createTempDir()); + executorService = Execs.multiThreaded(3, "writeOutMedium-%d"); + } + + @After + public void tearDown() + { + executorService.shutdownNow(); + } + + @Test + public void testFileCount() throws InterruptedException + { + final int threadCount = 3; + final CountDownLatch latch = new CountDownLatch(threadCount); + + for (int i = 0; i < threadCount; i++) { + executorService.submit( + () -> { + WriteOutBytes writeOutBytes = writeOutMedium.makeWriteOutBytes(); + Assert.assertEquals(0, writeOutMedium.getNumLocallyCreated()); + try { + latch.countDown(); + latch.await(); + ByteBuffer allocate = ByteBuffer.allocate(4096); + writeOutBytes.write(allocate); + } + catch (Exception e) { + throw new RuntimeException(e); + } + Assert.assertEquals(1, writeOutMedium.getNumLocallyCreated()); + } + ); + } + executorService.awaitTermination(15, TimeUnit.SECONDS); + Assert.assertEquals(threadCount, writeOutMedium.getFilesCreated()); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/writeout/WriteOutBytesTest.java b/processing/src/test/java/org/apache/druid/segment/writeout/WriteOutBytesTest.java index 9b15bfd0fa21..267707d4870a 100644 --- a/processing/src/test/java/org/apache/druid/segment/writeout/WriteOutBytesTest.java +++ b/processing/src/test/java/org/apache/druid/segment/writeout/WriteOutBytesTest.java @@ -119,4 +119,14 @@ public void testReadFullyEmptyAtTheEnd() throws IOException writeOutBytes.write('1'); writeOutBytes.readFully(1, ByteBuffer.allocate(0)); } + + @Test + public void testWriteWithOffset() throws IOException + { + WriteOutBytes writeOutBytes = segmentWriteOutMedium.makeWriteOutBytes(); + writeOutBytes.write(new byte[] {0x01, 0x02, 0x03, 0x04}, 1, 2); + ByteBuffer destination = ByteBuffer.allocate(2); + writeOutBytes.readFully(0, destination); + Assert.assertArrayEquals(new byte[] {0x02, 0x03}, destination.array()); + } }