From aa5ac2f8ee26f72cd029e4c1ad39e7631b6fc1bd Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Mon, 8 Jul 2024 11:23:27 +0530 Subject: [PATCH 01/10] WriteOutBytes improvements --- .../apache/druid/error/DruidException.java | 13 +- .../segment/writeout/FileWriteOutBytes.java | 46 ++-- .../LazilyAllocatingHeapWriteOutBytes.java | 218 ++++++++++++++++++ .../writeout/SegmentWriteOutMedium.java | 5 +- .../TmpFileSegmentWriteOutMedium.java | 67 +++++- .../druid/segment/writeout/WriteOutBytes.java | 8 +- 6 files changed, 326 insertions(+), 31 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytes.java diff --git a/processing/src/main/java/org/apache/druid/error/DruidException.java b/processing/src/main/java/org/apache/druid/error/DruidException.java index a04f3f6512cf..3e2962c311c3 100644 --- a/processing/src/main/java/org/apache/druid/error/DruidException.java +++ b/processing/src/main/java/org/apache/druid/error/DruidException.java @@ -167,7 +167,7 @@ public static DruidExceptionBuilder defensive() /** * Build a "defensive" exception, this is an exception that should never actually be triggered, but we are - * throwing it inside of a defensive check. + * throwing it inside a defensive check. * * @return A builder for a defensive exception. */ @@ -176,6 +176,17 @@ public static DruidException defensive(String format, Object... args) return defensive().build(format, args); } + /** + * Build a "defensive" exception, this is an exception that should never actually be triggered, but we are + * throwing it inside a defensive check. + * + * @return A builder for a defensive exception. + */ + public static DruidException defensive(Throwable cause, String format, Object... args) + { + return defensive().build(cause, format, args); + } + /** * Build a "defensive" exception, this is an exception that should never actually be triggered. Throw to * allow messages to be seen by developers diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java index e41a362fda8e..135db827c39c 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java @@ -33,14 +33,17 @@ import java.nio.channels.FileChannel; import java.nio.channels.WritableByteChannel; -final class FileWriteOutBytes extends WriteOutBytes +public final class FileWriteOutBytes extends WriteOutBytes { private final File file; private final FileChannel ch; private long writeOutBytes; - /** Purposely big-endian, for {@link #writeInt(int)} implementation */ - private final ByteBuffer buffer = ByteBuffer.allocate(4096); // 4K page sized buffer + /** + * Purposely big-endian, for {@link #writeInt(int)} implementation. + * Direct because there is a material difference in performance when writing direct buffers + */ + private final ByteBuffer buffer = ByteBuffer.allocateDirect(32768); // 32K page sized buffer FileWriteOutBytes(File file, FileChannel ch) { @@ -48,7 +51,7 @@ final class FileWriteOutBytes extends WriteOutBytes this.ch = ch; this.writeOutBytes = 0L; } - + private void flushIfNeeded(int bytesNeeded) throws IOException { if (buffer.remaining() < bytesNeeded) { @@ -90,22 +93,29 @@ public int write(ByteBuffer src) throws IOException { int len = src.remaining(); flushIfNeeded(len); - while (src.remaining() > buffer.capacity()) { - int srcLimit = src.limit(); - try { - src.limit(src.position() + buffer.capacity()); - buffer.put(src); - writeOutBytes += buffer.capacity(); - flush(); - } - finally { - // IOException may occur in flush(), reset src limit to the original - src.limit(srcLimit); + if (len > buffer.remaining()) { + // if a flush was required, flushIfNeeded should have forced a flush. So, if the len is greater than + // our buffer size, we should just dump it straight to the file instead of buffering + Channels.writeFully(ch, src); + writeOutBytes += len; + } else { + while (src.remaining() > buffer.capacity()) { + int srcLimit = src.limit(); + try { + src.limit(src.position() + buffer.capacity()); + buffer.put(src); + writeOutBytes += buffer.capacity(); + flush(); + } + finally { + // IOException may occur in flush(), reset src limit to the original + src.limit(srcLimit); + } } + int remaining = src.remaining(); + buffer.put(src); + writeOutBytes += remaining; } - int remaining = src.remaining(); - buffer.put(src); - writeOutBytes += remaining; return len; } diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytes.java new file mode 100644 index 000000000000..66d73e525574 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytes.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.writeout; + +import org.apache.commons.io.input.NullInputStream; +import org.apache.druid.io.ByteBufferInputStream; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.BufferOverflowException; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.function.Supplier; + +public class LazilyAllocatingHeapWriteOutBytes extends WriteOutBytes +{ + private final Supplier delegateSupplier; + + private ByteBuffer tmpBuffer = null; + private WriteOutBytes delegate = null; + + public LazilyAllocatingHeapWriteOutBytes(Supplier delegateSupplier) + { + this.delegateSupplier = delegateSupplier; + } + + @Override + public void writeInt(int v) throws IOException + { + final boolean useBuffer = ensureBytes(Integer.BYTES); + if (useBuffer) { + tmpBuffer.putInt(v); + return; + } + + delegate.writeInt(v); + } + + @Override + public long size() + { + if (delegate == null) { + return tmpBuffer == null ? 0 : tmpBuffer.position(); + } else { + return delegate.size(); + } + } + + @Override + public void writeTo(WritableByteChannel channel) throws IOException + { + if (delegate == null) { + if (tmpBuffer == null) { + return; + } + + final ByteBuffer tmpBufCopy = tmpBuffer.asReadOnlyBuffer(); + tmpBufCopy.flip(); + channel.write(tmpBufCopy); + } else { + delegate.writeTo(channel); + } + } + + @Override + public InputStream asInputStream() throws IOException + { + if (delegate == null) { + if (tmpBuffer == null) { + return new NullInputStream(); + } + final ByteBuffer tmpBufCopy = tmpBuffer.asReadOnlyBuffer(); + tmpBufCopy.flip(); + return new ByteBufferInputStream(tmpBufCopy); + } else { + return delegate.asInputStream(); + } + } + + @Override + public void readFully(long pos, ByteBuffer buffer) throws IOException + { + if (delegate == null) { + if (tmpBuffer == null) { + return; + } + + if (pos < tmpBuffer.position()) { + final ByteBuffer tmpBufCopy = tmpBuffer.asReadOnlyBuffer(); + tmpBufCopy.flip().position((int) pos); + if (tmpBufCopy.remaining() < buffer.remaining()) { + throw new BufferUnderflowException(); + } + tmpBufCopy.limit(tmpBufCopy.position() + buffer.remaining()); + buffer.put(tmpBufCopy); + } else { + throw new BufferOverflowException(); + } + } else { + delegate.readFully(pos, buffer); + } + } + + @Override + public void write(int b) throws IOException + { + final boolean useBuffer = ensureBytes(1); + if (useBuffer) { + tmpBuffer.put((byte) b); + return; + } + + delegate.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException + { + write(ByteBuffer.wrap(b, off, len)); + } + + @Override + public int write(ByteBuffer src) throws IOException + { + final int numToWrite = src.remaining(); + final boolean useBuffer = ensureBytes(numToWrite); + if (useBuffer) { + tmpBuffer.put(src); + return numToWrite; + } + + return delegate.write(src); + } + + @Override + public boolean isOpen() + { + return delegate == null ? true : delegate.isOpen(); + } + + /** + * Ensures bytes are available, returns a boolean indicating if the buffer should be used or the delegate + * + * @param numBytes the number of bytes that need writing + * @return boolean true if the buffer should be used, false if the delegate should be used + * @throws IOException if an issue with IO occurs (can primarily happen when copying buffers) + */ + private boolean ensureBytes(int numBytes) throws IOException + { + if (tmpBuffer == null) { + if (delegate == null) { + if (numBytes < 128) { + tmpBuffer = ByteBuffer.allocate(128); + } else if (numBytes < 4096) { + tmpBuffer = ByteBuffer.allocate(4096); + } else { + // We are likely dealing with something that's already buffering stuff, so just switch delegate immediately + delegate = delegateSupplier.get(); + return false; + } + return true; + } else { + return false; + } + } + + if (numBytes < tmpBuffer.remaining()) { + return true; + } + + final ByteBuffer newBuf; + switch (tmpBuffer.capacity()) { + case 128: + if (numBytes < 4096 - tmpBuffer.position()) { + newBuf = ByteBuffer.allocate(4096); + break; + } + case 4096: + if (numBytes < 16384 - tmpBuffer.position()) { + newBuf = ByteBuffer.allocate(16384); + break; + } + default: + newBuf = null; + } + + if (newBuf == null) { + delegate = delegateSupplier.get(); + tmpBuffer.flip(); + delegate.write(tmpBuffer); + tmpBuffer = null; + return false; + } else { + tmpBuffer.flip(); + newBuf.put(tmpBuffer); + tmpBuffer = newBuf; + return true; + } + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/SegmentWriteOutMedium.java b/processing/src/main/java/org/apache/druid/segment/writeout/SegmentWriteOutMedium.java index 7c2af89b1b66..16efd07cb1bf 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/SegmentWriteOutMedium.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/SegmentWriteOutMedium.java @@ -28,7 +28,7 @@ * SegmentWriteOutMedium is an umbrella "resource disposer" for temporary buffers (in the form of {@link WriteOutBytes}, * obtained by calling {@link #makeWriteOutBytes()} on the SegmentWriteOutMedium instance), that are used during new Druid * segment creation, and other resources (see {@link #getCloser()}). - * + *

* When SegmentWriteOutMedium is closed, all child WriteOutBytes couldn't be used anymore. */ public interface SegmentWriteOutMedium extends Closeable @@ -37,6 +37,7 @@ public interface SegmentWriteOutMedium extends Closeable * Creates a new empty {@link WriteOutBytes}, attached to this SegmentWriteOutMedium. When this SegmentWriteOutMedium is * closed, the returned WriteOutBytes couldn't be used anymore. */ + @SuppressWarnings("RedundantThrows") WriteOutBytes makeWriteOutBytes() throws IOException; /** @@ -45,7 +46,7 @@ public interface SegmentWriteOutMedium extends Closeable * using a shared {@link SegmentWriteOutMedium} but which control the complete lifecycle of the {@link WriteOutBytes} * which they require to free the backing resources when they are finished, rather than waiting until * {@link #close()} is called for this medium. - * + *

* The 'child' medium will be closed when {@link #close()} is called, if not called explicitly prior to closing this * medium. */ diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java b/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java index 007b93aa04b7..5dcb5678082d 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java @@ -19,21 +19,47 @@ package org.apache.druid.segment.writeout; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.logger.Logger; import java.io.File; import java.io.IOException; import java.nio.channels.FileChannel; import java.nio.file.StandardOpenOption; +import java.util.SortedMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicInteger; +/** + * Builds segment write out medium objects that are based on temporary files. Some analysis of usage of these + * objects shows that they periodically get used for very small things, where the overhead of creating a tmp file + * is actually very large. It would be best to go back and look at usages and try to make them lazy such that + * they only actually use a medium when they need it. But, in an attempt to get some benefits, we "shim" in the + * laziness by returning a heap-based WriteOutBytes that only falls back to making a tmp file when it actually + * fills up. + */ public final class TmpFileSegmentWriteOutMedium implements SegmentWriteOutMedium { + private static final Logger log = new Logger(TmpFileSegmentWriteOutMedium.class); + + private final AtomicInteger filesCreated; + private final SortedMap sizeDistribution; + private int numLocallyCreated = 0; + private final File dir; private final Closer closer = Closer.create(); TmpFileSegmentWriteOutMedium(File outDir) throws IOException { + this(outDir, new AtomicInteger(0), new ConcurrentSkipListMap<>()); + } + + private TmpFileSegmentWriteOutMedium(File outDir, AtomicInteger filesCreated, SortedMap sizeDistribution) throws IOException + { + this.filesCreated = filesCreated; + this.sizeDistribution = sizeDistribution; File tmpOutputFilesDir = new File(outDir, "tmpOutputFiles"); FileUtils.mkdirp(tmpOutputFilesDir); closer.register(() -> FileUtils.deleteDirectory(tmpOutputFilesDir)); @@ -43,21 +69,41 @@ public final class TmpFileSegmentWriteOutMedium implements SegmentWriteOutMedium @Override public WriteOutBytes makeWriteOutBytes() throws IOException { - File file = File.createTempFile("filePeon", null, dir); - FileChannel ch = FileChannel.open( - file.toPath(), - StandardOpenOption.READ, - StandardOpenOption.WRITE + return new LazilyAllocatingHeapWriteOutBytes( + () -> { + final int i = filesCreated.incrementAndGet(); + ++numLocallyCreated; + File file; + FileChannel ch; + try { + file = File.createTempFile("filePeon", null, dir); + ch = FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE); + } + catch (IOException e) { + throw DruidException.defensive(e, "Failed"); + } + if (i % 1000 == 0) { + log.info("Created [%,d] tmp files. [%s]", i, dir); + } + final FileWriteOutBytes retVal = new FileWriteOutBytes(file, ch); + closer.register(file::delete); + closer.register(ch); + closer.register(() -> { + sizeDistribution.compute(retVal.size(), (key, val) -> val == null ? 1 : val + 1); + }); + return retVal; + } ); - closer.register(file::delete); - closer.register(ch); - return new FileWriteOutBytes(file, ch); } @Override public SegmentWriteOutMedium makeChildWriteOutMedium() throws IOException { - TmpFileSegmentWriteOutMedium tmpFileSegmentWriteOutMedium = new TmpFileSegmentWriteOutMedium(dir); + TmpFileSegmentWriteOutMedium tmpFileSegmentWriteOutMedium = new TmpFileSegmentWriteOutMedium( + dir, + filesCreated, + sizeDistribution + ); closer.register(tmpFileSegmentWriteOutMedium); return tmpFileSegmentWriteOutMedium; } @@ -71,6 +117,9 @@ public Closer getCloser() @Override public void close() throws IOException { + log.debug("Closing, files still open[%,d], filesBeingClosed[%,d], dir[%s]", filesCreated.get(), numLocallyCreated, dir); + filesCreated.set(filesCreated.get() - numLocallyCreated); + numLocallyCreated = 0; closer.close(); } } diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java index e1c2972b1485..95ab2f7b01dc 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java @@ -30,7 +30,7 @@ * WritableByteChannel} and {@link #writeInt(int)} append to the sequence. Methods {@link * #writeTo(WritableByteChannel)} and {@link #asInputStream()} allow to write the sequence somewhere else. {@link * #readFully} allows to access the sequence randomly. - * + *

* WriteOutBytes is a resource that is managed by {@link SegmentWriteOutMedium}, so it's own {@link #close()} method * does nothing. However WriteOutBytes should appear closed, i. e. {@link #isOpen()} returns false, after the parental * SegmentWriteOutMedium is closed. @@ -47,6 +47,12 @@ public abstract class WriteOutBytes extends OutputStream implements WritableByte */ public abstract long size(); + @Override + public void write(byte[] b, int off, int len) throws IOException + { + write(ByteBuffer.wrap(b, off, len)); + } + /** * Takes all bytes that are written to this WriteOutBytes so far and writes them into the given channel. */ From 6479a5eda610ce6d163af0250797421d5d2c252b Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Sat, 13 Jul 2024 22:59:10 +0530 Subject: [PATCH 02/10] Add tests --- .../TmpFileSegmentWriteOutMedium.java | 14 +++++++-- .../writeout/FileWriteOutBytesTest.java | 29 +++++++------------ .../segment/writeout/WriteOutBytesTest.java | 10 +++++++ 3 files changed, 32 insertions(+), 21 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java b/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java index 5dcb5678082d..1b9b7dac5ea6 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.nio.channels.FileChannel; import java.nio.file.StandardOpenOption; +import java.util.Map; import java.util.SortedMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; @@ -47,6 +48,7 @@ public final class TmpFileSegmentWriteOutMedium implements SegmentWriteOutMedium private final AtomicInteger filesCreated; private final SortedMap sizeDistribution; private int numLocallyCreated = 0; + private boolean root = false; private final File dir; private final Closer closer = Closer.create(); @@ -54,6 +56,7 @@ public final class TmpFileSegmentWriteOutMedium implements SegmentWriteOutMedium TmpFileSegmentWriteOutMedium(File outDir) throws IOException { this(outDir, new AtomicInteger(0), new ConcurrentSkipListMap<>()); + root = true; } private TmpFileSegmentWriteOutMedium(File outDir, AtomicInteger filesCreated, SortedMap sizeDistribution) throws IOException @@ -67,7 +70,7 @@ private TmpFileSegmentWriteOutMedium(File outDir, AtomicInteger filesCreated, So } @Override - public WriteOutBytes makeWriteOutBytes() throws IOException + public WriteOutBytes makeWriteOutBytes() { return new LazilyAllocatingHeapWriteOutBytes( () -> { @@ -117,9 +120,16 @@ public Closer getCloser() @Override public void close() throws IOException { - log.debug("Closing, files still open[%,d], filesBeingClosed[%,d], dir[%s]", filesCreated.get(), numLocallyCreated, dir); + log.info("Closing, files still open[%,d], filesBeingClosed[%,d], dir[%s]", filesCreated.get(), numLocallyCreated, dir); filesCreated.set(filesCreated.get() - numLocallyCreated); numLocallyCreated = 0; closer.close(); + + if (root) { + log.info("Size distribution of files:"); + for (Map.Entry entry : sizeDistribution.entrySet()) { + log.info("%,15d => %,15d", entry.getKey(), entry.getValue()); + } + } } } diff --git a/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java b/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java index da6b3c481a5d..374c84fe7e07 100644 --- a/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java +++ b/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java @@ -56,7 +56,7 @@ public void write4KiBIntsShouldNotFlush() throws IOException return remaining; }).times(1); EasyMock.replay(mockFileChannel); - final int writeBytes = 4096; + final int writeBytes = 32768; final int numOfInt = writeBytes / Integer.BYTES; for (int i = 0; i < numOfInt; i++) { fileWriteOutBytes.writeInt(i); @@ -92,14 +92,14 @@ public void writeBufferLargerThanCapacityShouldIncrementSizeCorrectly() throws I return remaining; }).times(1); EasyMock.replay(mockFileChannel); - ByteBuffer src = ByteBuffer.allocate(4096 + 1); + ByteBuffer src = ByteBuffer.allocate(32768 + 1); fileWriteOutBytes.write(src); Assert.assertEquals(src.capacity(), fileWriteOutBytes.size()); EasyMock.verify(mockFileChannel); } @Test - public void writeBufferLargerThanCapacityThrowsIOEInTheMiddleShouldIncrementSizeCorrectly() throws IOException + public void writeBufferLargerThanCapacityWritesDirectlyToFileShouldIncrementSizeCorrectly() throws IOException { EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) .andAnswer(() -> { @@ -108,25 +108,16 @@ public void writeBufferLargerThanCapacityThrowsIOEInTheMiddleShouldIncrementSize buffer.position(remaining); return remaining; }).once(); - EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) - .andThrow(new IOException()) - .once(); EasyMock.replay(mockFileChannel); - ByteBuffer src = ByteBuffer.allocate(4096 * 2 + 1); - try { - fileWriteOutBytes.write(src); - Assert.fail("IOException should have been thrown."); - } - catch (IOException e) { - // The second invocation to flush bytes fails. So the size should count what has already been put successfully - Assert.assertEquals(4096 * 2, fileWriteOutBytes.size()); - } + ByteBuffer src = ByteBuffer.allocate(32768 * 2 + 1); + fileWriteOutBytes.write(src); + Assert.assertEquals(32768 * 2 + 1, fileWriteOutBytes.size()); } @Test public void writeBufferSmallerThanCapacityShouldIncrementSizeCorrectly() throws IOException { - ByteBuffer src = ByteBuffer.allocate(4096); + ByteBuffer src = ByteBuffer.allocate(32768); fileWriteOutBytes.write(src); Assert.assertEquals(src.capacity(), fileWriteOutBytes.size()); } @@ -146,7 +137,7 @@ public void sizeDoesNotFlush() throws IOException @Test public void testReadFullyWorks() throws IOException { - int fileSize = 4096; + int fileSize = 32768; int numOfInt = fileSize / Integer.BYTES; ByteBuffer destination = ByteBuffer.allocate(Integer.BYTES); ByteBuffer underlying = ByteBuffer.allocate(fileSize); @@ -181,7 +172,7 @@ public void testReadFullyWorks() throws IOException @Test public void testReadFullyOutOfBoundsDoesnt() throws IOException { - int fileSize = 4096; + int fileSize = 32768; int numOfInt = fileSize / Integer.BYTES; ByteBuffer destination = ByteBuffer.allocate(Integer.BYTES); EasyMock.replay(mockFileChannel); @@ -191,7 +182,7 @@ public void testReadFullyOutOfBoundsDoesnt() throws IOException Assert.assertEquals(fileSize, fileWriteOutBytes.size()); destination.position(0); - Assert.assertThrows(IAE.class, () -> fileWriteOutBytes.readFully(5000, destination)); + Assert.assertThrows(IAE.class, () -> fileWriteOutBytes.readFully(33000, destination)); EasyMock.verify(mockFileChannel); } diff --git a/processing/src/test/java/org/apache/druid/segment/writeout/WriteOutBytesTest.java b/processing/src/test/java/org/apache/druid/segment/writeout/WriteOutBytesTest.java index 9b15bfd0fa21..267707d4870a 100644 --- a/processing/src/test/java/org/apache/druid/segment/writeout/WriteOutBytesTest.java +++ b/processing/src/test/java/org/apache/druid/segment/writeout/WriteOutBytesTest.java @@ -119,4 +119,14 @@ public void testReadFullyEmptyAtTheEnd() throws IOException writeOutBytes.write('1'); writeOutBytes.readFully(1, ByteBuffer.allocate(0)); } + + @Test + public void testWriteWithOffset() throws IOException + { + WriteOutBytes writeOutBytes = segmentWriteOutMedium.makeWriteOutBytes(); + writeOutBytes.write(new byte[] {0x01, 0x02, 0x03, 0x04}, 1, 2); + ByteBuffer destination = ByteBuffer.allocate(2); + writeOutBytes.readFully(0, destination); + Assert.assertArrayEquals(new byte[] {0x02, 0x03}, destination.array()); + } } From b8bae7667f3a8c46c817c107717e41d95c00aec3 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Sat, 13 Jul 2024 23:36:56 +0530 Subject: [PATCH 03/10] Handle closing better --- .../segment/writeout/FileWriteOutBytes.java | 10 ++++++- .../LazilyAllocatingHeapWriteOutBytes.java | 26 +++++++++++++++++-- .../TmpFileSegmentWriteOutMedium.java | 5 ++-- .../writeout/FileWriteOutBytesTest.java | 3 ++- 4 files changed, 38 insertions(+), 6 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java index 135db827c39c..ed7b8c4a6a5b 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java @@ -21,8 +21,10 @@ import com.google.common.io.ByteStreams; import org.apache.druid.io.Channels; +import org.apache.druid.java.util.common.ByteBufferUtils; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IOE; +import org.apache.druid.java.util.common.io.Closer; import java.io.File; import java.io.FileInputStream; @@ -45,11 +47,17 @@ public final class FileWriteOutBytes extends WriteOutBytes */ private final ByteBuffer buffer = ByteBuffer.allocateDirect(32768); // 32K page sized buffer - FileWriteOutBytes(File file, FileChannel ch) + FileWriteOutBytes(File file, FileChannel ch, Closer closer) { this.file = file; this.ch = ch; this.writeOutBytes = 0L; + closer.register( + () -> { + ByteBufferUtils.free(buffer); + buffer.clear(); + } + ); } private void flushIfNeeded(int bytesNeeded) throws IOException diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytes.java index 66d73e525574..a3251bdd6d9e 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytes.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytes.java @@ -20,7 +20,9 @@ package org.apache.druid.segment.writeout; import org.apache.commons.io.input.NullInputStream; +import org.apache.druid.error.DruidException; import org.apache.druid.io.ByteBufferInputStream; +import org.apache.druid.java.util.common.io.Closer; import java.io.IOException; import java.io.InputStream; @@ -36,15 +38,23 @@ public class LazilyAllocatingHeapWriteOutBytes extends WriteOutBytes private ByteBuffer tmpBuffer = null; private WriteOutBytes delegate = null; + boolean open = true; - public LazilyAllocatingHeapWriteOutBytes(Supplier delegateSupplier) + public LazilyAllocatingHeapWriteOutBytes(Supplier delegateSupplier, Closer closer) { this.delegateSupplier = delegateSupplier; + closer.register(() -> { + open = false; + if (tmpBuffer != null) { + tmpBuffer.clear(); + } + }); } @Override public void writeInt(int v) throws IOException { + checkOpen(); final boolean useBuffer = ensureBytes(Integer.BYTES); if (useBuffer) { tmpBuffer.putInt(v); @@ -67,6 +77,7 @@ public long size() @Override public void writeTo(WritableByteChannel channel) throws IOException { + checkOpen(); if (delegate == null) { if (tmpBuffer == null) { return; @@ -83,6 +94,7 @@ public void writeTo(WritableByteChannel channel) throws IOException @Override public InputStream asInputStream() throws IOException { + checkOpen(); if (delegate == null) { if (tmpBuffer == null) { return new NullInputStream(); @@ -98,6 +110,7 @@ public InputStream asInputStream() throws IOException @Override public void readFully(long pos, ByteBuffer buffer) throws IOException { + checkOpen(); if (delegate == null) { if (tmpBuffer == null) { return; @@ -122,6 +135,7 @@ public void readFully(long pos, ByteBuffer buffer) throws IOException @Override public void write(int b) throws IOException { + checkOpen(); final boolean useBuffer = ensureBytes(1); if (useBuffer) { tmpBuffer.put((byte) b); @@ -140,6 +154,7 @@ public void write(byte[] b, int off, int len) throws IOException @Override public int write(ByteBuffer src) throws IOException { + checkOpen(); final int numToWrite = src.remaining(); final boolean useBuffer = ensureBytes(numToWrite); if (useBuffer) { @@ -153,7 +168,14 @@ public int write(ByteBuffer src) throws IOException @Override public boolean isOpen() { - return delegate == null ? true : delegate.isOpen(); + return open; + } + + private void checkOpen() + { + if (!isOpen()) { + throw DruidException.defensive("WriteOutBytes is already closed."); + } } /** diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java b/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java index 1b9b7dac5ea6..8c7254f43581 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java @@ -88,14 +88,15 @@ public WriteOutBytes makeWriteOutBytes() if (i % 1000 == 0) { log.info("Created [%,d] tmp files. [%s]", i, dir); } - final FileWriteOutBytes retVal = new FileWriteOutBytes(file, ch); + final FileWriteOutBytes retVal = new FileWriteOutBytes(file, ch, closer); closer.register(file::delete); closer.register(ch); closer.register(() -> { sizeDistribution.compute(retVal.size(), (key, val) -> val == null ? 1 : val + 1); }); return retVal; - } + }, + closer ); } diff --git a/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java b/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java index 374c84fe7e07..50355438177b 100644 --- a/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java +++ b/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.writeout; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.io.Closer; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; @@ -41,7 +42,7 @@ public void setUp() { mockFileChannel = EasyMock.mock(FileChannel.class); mockFile = EasyMock.mock(File.class); - fileWriteOutBytes = new FileWriteOutBytes(mockFile, mockFileChannel); + fileWriteOutBytes = new FileWriteOutBytes(mockFile, mockFileChannel, Closer.create()); } @Test From 8b1bc68bc9df0833602dfa9863beac514bbc900b Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Sat, 13 Jul 2024 23:58:57 +0530 Subject: [PATCH 04/10] Fix tests --- .../segment/writeout/LazilyAllocatingHeapWriteOutBytes.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytes.java index a3251bdd6d9e..122d9e6f3c28 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytes.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytes.java @@ -38,7 +38,7 @@ public class LazilyAllocatingHeapWriteOutBytes extends WriteOutBytes private ByteBuffer tmpBuffer = null; private WriteOutBytes delegate = null; - boolean open = true; + private boolean open = true; public LazilyAllocatingHeapWriteOutBytes(Supplier delegateSupplier, Closer closer) { @@ -116,7 +116,7 @@ public void readFully(long pos, ByteBuffer buffer) throws IOException return; } - if (pos < tmpBuffer.position()) { + if (pos <= tmpBuffer.position()) { final ByteBuffer tmpBufCopy = tmpBuffer.asReadOnlyBuffer(); tmpBufCopy.flip().position((int) pos); if (tmpBufCopy.remaining() < buffer.remaining()) { From a938127df8126177b090a34dab9cb51a8f07ea59 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Sun, 14 Jul 2024 20:47:08 +0530 Subject: [PATCH 05/10] Add tests --- .../segment/writeout/FileWriteOutBytes.java | 5 +- .../LazilyAllocatingHeapWriteOutBytes.java | 25 +++++- .../writeout/FileWriteOutBytesTest.java | 10 +-- ...LazilyAllocatingHeapWriteOutBytesTest.java | 77 +++++++++++++++++++ 4 files changed, 105 insertions(+), 12 deletions(-) create mode 100644 processing/src/test/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytesTest.java diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java index ed7b8c4a6a5b..0d0a7a40555d 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java @@ -53,10 +53,7 @@ public final class FileWriteOutBytes extends WriteOutBytes this.ch = ch; this.writeOutBytes = 0L; closer.register( - () -> { - ByteBufferUtils.free(buffer); - buffer.clear(); - } + () -> ByteBufferUtils.free(buffer) ); } diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytes.java index 122d9e6f3c28..8d3d7bb76125 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytes.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytes.java @@ -19,6 +19,7 @@ package org.apache.druid.segment.writeout; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.io.input.NullInputStream; import org.apache.druid.error.DruidException; import org.apache.druid.io.ByteBufferInputStream; @@ -32,6 +33,13 @@ import java.nio.channels.WritableByteChannel; import java.util.function.Supplier; +/** + * Lazily decides to use a tmpBuffer to act as WriteOutBytes, till more than certain threshold is reached. Once this is + * met, it switches to delegating all calls to a {@link WriteOutBytes} created by delegateSupplier. + *

+ * This is useful if the data stored in the {@link WriteOutBytes} is small enough that buffering the changes in memory + * would be faster than creating some {@link WriteOutBytes}. + */ public class LazilyAllocatingHeapWriteOutBytes extends WriteOutBytes { private final Supplier delegateSupplier; @@ -45,9 +53,8 @@ public LazilyAllocatingHeapWriteOutBytes(Supplier delegateSupplie this.delegateSupplier = delegateSupplier; closer.register(() -> { open = false; - if (tmpBuffer != null) { - tmpBuffer.clear(); - } + tmpBuffer = null; + delegate = null; }); } @@ -237,4 +244,16 @@ private boolean ensureBytes(int numBytes) throws IOException return true; } } + + @VisibleForTesting + ByteBuffer getTmpBuffer() + { + return tmpBuffer; + } + + @VisibleForTesting + WriteOutBytes getDelegate() + { + return delegate; + } } diff --git a/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java b/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java index 50355438177b..cd39c76c99d7 100644 --- a/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java +++ b/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java @@ -46,9 +46,9 @@ public void setUp() } @Test - public void write4KiBIntsShouldNotFlush() throws IOException + public void write32KiBIntsShouldNotFlush() throws IOException { - // Write 4KiB of ints and expect the write operation of the file channel will be triggered only once. + // Write 32KiB of ints and expect the write operation of the file channel will be triggered only once. EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) .andAnswer(() -> { ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0]; @@ -62,8 +62,8 @@ public void write4KiBIntsShouldNotFlush() throws IOException for (int i = 0; i < numOfInt; i++) { fileWriteOutBytes.writeInt(i); } - // no need to flush up to 4KiB - // the first byte after 4KiB will cause a flush + // no need to flush up to 32KiB + // the first byte after 32KiB will cause a flush fileWriteOutBytes.write(1); EasyMock.verify(mockFileChannel); } @@ -142,7 +142,7 @@ public void testReadFullyWorks() throws IOException int numOfInt = fileSize / Integer.BYTES; ByteBuffer destination = ByteBuffer.allocate(Integer.BYTES); ByteBuffer underlying = ByteBuffer.allocate(fileSize); - // Write 4KiB of ints and expect the write operation of the file channel will be triggered only once. + // Write 32KiB of ints and expect the write operation of the file channel will be triggered only once. EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) .andAnswer(() -> { ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0]; diff --git a/processing/src/test/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytesTest.java b/processing/src/test/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytesTest.java new file mode 100644 index 000000000000..711e93949f8f --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytesTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.writeout; + +import org.apache.druid.java.util.common.io.Closer; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class LazilyAllocatingHeapWriteOutBytesTest +{ + private LazilyAllocatingHeapWriteOutBytes target; + private Closer closer; + private HeapByteBufferWriteOutBytes heapByteBufferWriteOutBytes; + + @Before + public void setUp() throws Exception + { + closer = Closer.create(); + heapByteBufferWriteOutBytes = new HeapByteBufferWriteOutBytes(); + target = new LazilyAllocatingHeapWriteOutBytes( + () -> heapByteBufferWriteOutBytes, + closer + ); + } + + @Test + public void testWritingToBuffer() throws IOException + { + Assert.assertNull(target.getTmpBuffer()); + + target.write(ByteBuffer.allocate(512)); + Assert.assertNotNull(target.getTmpBuffer()); + Assert.assertEquals(4096, target.getTmpBuffer().limit()); + Assert.assertNull(target.getDelegate()); + + target.write(ByteBuffer.allocate(16385)); + Assert.assertNull(target.getTmpBuffer()); + Assert.assertNotNull(target.getDelegate()); + Assert.assertEquals(16385 + 512, target.getDelegate().size()); + } + + @Test + public void testClosingWriteOutBytes() throws IOException + { + Assert.assertNull(target.getTmpBuffer()); + + target.writeInt(5); + Assert.assertNotNull(target.getTmpBuffer()); + Assert.assertEquals(128, target.getTmpBuffer().limit()); + Assert.assertNull(target.getDelegate()); + + closer.close(); + + Assert.assertNull(target.getTmpBuffer()); + } +} From 4bf8071087fa839cce08f76cb9c292b5b172e1f7 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Sun, 14 Jul 2024 22:31:39 +0530 Subject: [PATCH 06/10] Fix checkstyle --- .../segment/writeout/LazilyAllocatingHeapWriteOutBytesTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/test/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytesTest.java b/processing/src/test/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytesTest.java index 711e93949f8f..32a5d09ba007 100644 --- a/processing/src/test/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytesTest.java +++ b/processing/src/test/java/org/apache/druid/segment/writeout/LazilyAllocatingHeapWriteOutBytesTest.java @@ -34,7 +34,7 @@ public class LazilyAllocatingHeapWriteOutBytesTest private HeapByteBufferWriteOutBytes heapByteBufferWriteOutBytes; @Before - public void setUp() throws Exception + public void setUp() { closer = Closer.create(); heapByteBufferWriteOutBytes = new HeapByteBufferWriteOutBytes(); From 003387ca1ab0bcd8089fe6ff01a70ec15ccaeb5e Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Wed, 17 Jul 2024 14:42:34 +0530 Subject: [PATCH 07/10] Add tests --- .../TmpFileSegmentWriteOutMedium.java | 13 +++ .../TmpFileSegmentWriteOutMediumTest.java | 80 +++++++++++++++++++ 2 files changed, 93 insertions(+) create mode 100644 processing/src/test/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMediumTest.java diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java b/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java index 8c7254f43581..93cff5ab4363 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java @@ -19,6 +19,7 @@ package org.apache.druid.segment.writeout; +import com.google.common.annotations.VisibleForTesting; import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.io.Closer; @@ -133,4 +134,16 @@ public void close() throws IOException } } } + + @VisibleForTesting + int getFilesCreated() + { + return filesCreated.get(); + } + + @VisibleForTesting + int getNumLocallyCreated() + { + return numLocallyCreated; + } } diff --git a/processing/src/test/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMediumTest.java b/processing/src/test/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMediumTest.java new file mode 100644 index 000000000000..fb65837a866b --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMediumTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.writeout; + +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +public class TmpFileSegmentWriteOutMediumTest +{ + private volatile TmpFileSegmentWriteOutMedium writeOutMedium; + private ExecutorService executorService; + + @Before + public void setUp() throws IOException + { + writeOutMedium = new TmpFileSegmentWriteOutMedium(FileUtils.createTempDir()); + executorService = Execs.multiThreaded(3, "writeOutMedium-%d"); + } + + @After + public void tearDown() throws Exception + { + executorService.shutdownNow(); + } + + @Test + public void testFileCount() throws InterruptedException + { + final int threadCount = 3; + final CountDownLatch latch = new CountDownLatch(threadCount); + + for (int i = 0; i < threadCount; i++) { + executorService.submit( + () -> { + WriteOutBytes writeOutBytes = writeOutMedium.makeWriteOutBytes(); + Assert.assertEquals(0, writeOutMedium.getNumLocallyCreated()); + try { + latch.countDown(); + latch.await(); + ByteBuffer allocate = ByteBuffer.allocate(4096); + writeOutBytes.write(allocate); + } + catch (Exception e) { + throw new RuntimeException(e); + } + Assert.assertEquals(1, writeOutMedium.getNumLocallyCreated()); + } + ); + } + executorService.awaitTermination(15, TimeUnit.SECONDS); + Assert.assertEquals(threadCount, writeOutMedium.getFilesCreated()); + } +} From 71d2a2593bceebbbdb896adb414f8e42476e85fc Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Wed, 17 Jul 2024 15:24:56 +0530 Subject: [PATCH 08/10] Fix tests --- .../segment/writeout/TmpFileSegmentWriteOutMediumTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/test/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMediumTest.java b/processing/src/test/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMediumTest.java index fb65837a866b..11b1acdda24c 100644 --- a/processing/src/test/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMediumTest.java +++ b/processing/src/test/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMediumTest.java @@ -45,7 +45,7 @@ public void setUp() throws IOException } @After - public void tearDown() throws Exception + public void tearDown() { executorService.shutdownNow(); } From ef20cbc9db743296dc71a7105727175579a12822 Mon Sep 17 00:00:00 2001 From: imply-cheddar Date: Tue, 6 Aug 2024 14:12:34 -0500 Subject: [PATCH 09/10] Reintroduce old TmpFileSegmentWriteOutMedium Have it as a legacy option --- .../writeout/LegacyFileWriteOutBytes.java | 162 ++++++++++++++ .../LegacyTmpFileSegmentWriteOutMedium.java | 76 +++++++ ...cyTmpFileSegmentWriteOutMediumFactory.java | 46 ++++ .../SegmentWriteOutMediumFactory.java | 2 + .../TmpFileSegmentWriteOutMedium.java | 8 +- .../writeout/LegacyFileWriteOutBytesTest.java | 211 ++++++++++++++++++ 6 files changed, 501 insertions(+), 4 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/segment/writeout/LegacyFileWriteOutBytes.java create mode 100644 processing/src/main/java/org/apache/druid/segment/writeout/LegacyTmpFileSegmentWriteOutMedium.java create mode 100644 processing/src/main/java/org/apache/druid/segment/writeout/LegacyTmpFileSegmentWriteOutMediumFactory.java create mode 100644 processing/src/test/java/org/apache/druid/segment/writeout/LegacyFileWriteOutBytesTest.java diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/LegacyFileWriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/LegacyFileWriteOutBytes.java new file mode 100644 index 000000000000..71f2e589ec1e --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/writeout/LegacyFileWriteOutBytes.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.writeout; + +import com.google.common.io.ByteStreams; +import org.apache.druid.io.Channels; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.IOE; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.WritableByteChannel; + +final class LegacyFileWriteOutBytes extends WriteOutBytes +{ + private final File file; + private final FileChannel ch; + private long writeOutBytes; + + /** Purposely big-endian, for {@link #writeInt(int)} implementation */ + private final ByteBuffer buffer = ByteBuffer.allocate(4096); // 4K page sized buffer + + LegacyFileWriteOutBytes(File file, FileChannel ch) + { + this.file = file; + this.ch = ch; + this.writeOutBytes = 0L; + } + + private void flushIfNeeded(int bytesNeeded) throws IOException + { + if (buffer.remaining() < bytesNeeded) { + flush(); + } + } + + @Override + public void flush() throws IOException + { + buffer.flip(); + try { + Channels.writeFully(ch, buffer); + } + catch (IOException e) { + throw new IOE(e, "Failed to write to file: %s. Current size of file: %d", file.getAbsolutePath(), writeOutBytes); + } + buffer.clear(); + } + + @Override + public void write(int b) throws IOException + { + flushIfNeeded(1); + buffer.put((byte) b); + writeOutBytes++; + } + + @Override + public void writeInt(int v) throws IOException + { + flushIfNeeded(Integer.BYTES); + buffer.putInt(v); + writeOutBytes += Integer.BYTES; + } + + @Override + public int write(ByteBuffer src) throws IOException + { + int len = src.remaining(); + flushIfNeeded(len); + while (src.remaining() > buffer.capacity()) { + int srcLimit = src.limit(); + try { + src.limit(src.position() + buffer.capacity()); + buffer.put(src); + writeOutBytes += buffer.capacity(); + flush(); + } + finally { + // IOException may occur in flush(), reset src limit to the original + src.limit(srcLimit); + } + } + int remaining = src.remaining(); + buffer.put(src); + writeOutBytes += remaining; + return len; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException + { + write(ByteBuffer.wrap(b, off, len)); + } + + @Override + public long size() + { + return writeOutBytes; + } + + @Override + public void writeTo(WritableByteChannel channel) throws IOException + { + flush(); + ch.position(0); + try { + ByteStreams.copy(ch, channel); + } + finally { + ch.position(ch.size()); + } + } + + @Override + public void readFully(long pos, ByteBuffer buffer) throws IOException + { + if (pos < 0 || pos > writeOutBytes) { + throw new IAE("pos %d out of range [%d, %d]", pos, 0, writeOutBytes); + } + flush(); + ch.read(buffer, pos); + if (buffer.remaining() > 0) { + throw new BufferUnderflowException(); + } + } + + @Override + public InputStream asInputStream() throws IOException + { + flush(); + return new FileInputStream(file); + } + + @Override + public boolean isOpen() + { + return ch.isOpen(); + } +} \ No newline at end of file diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/LegacyTmpFileSegmentWriteOutMedium.java b/processing/src/main/java/org/apache/druid/segment/writeout/LegacyTmpFileSegmentWriteOutMedium.java new file mode 100644 index 000000000000..55e6b5ff0bc1 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/writeout/LegacyTmpFileSegmentWriteOutMedium.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.writeout; + +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.io.Closer; + +import java.io.File; +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.file.StandardOpenOption; + +public final class LegacyTmpFileSegmentWriteOutMedium implements SegmentWriteOutMedium +{ + private final File dir; + private final Closer closer = Closer.create(); + + LegacyTmpFileSegmentWriteOutMedium(File outDir) throws IOException + { + File tmpOutputFilesDir = new File(outDir, "tmpOutputFiles"); + FileUtils.mkdirp(tmpOutputFilesDir); + closer.register(() -> FileUtils.deleteDirectory(tmpOutputFilesDir)); + this.dir = tmpOutputFilesDir; + } + + @Override + public WriteOutBytes makeWriteOutBytes() throws IOException + { + File file = File.createTempFile("filePeon", null, dir); + FileChannel ch = FileChannel.open( + file.toPath(), + StandardOpenOption.READ, + StandardOpenOption.WRITE + ); + closer.register(file::delete); + closer.register(ch); + return new LegacyFileWriteOutBytes(file, ch); + } + + @Override + public SegmentWriteOutMedium makeChildWriteOutMedium() throws IOException + { + LegacyTmpFileSegmentWriteOutMedium tmpFileSegmentWriteOutMedium = new LegacyTmpFileSegmentWriteOutMedium(dir); + closer.register(tmpFileSegmentWriteOutMedium); + return tmpFileSegmentWriteOutMedium; + } + + @Override + public Closer getCloser() + { + return closer; + } + + @Override + public void close() throws IOException + { + closer.close(); + } +} \ No newline at end of file diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/LegacyTmpFileSegmentWriteOutMediumFactory.java b/processing/src/main/java/org/apache/druid/segment/writeout/LegacyTmpFileSegmentWriteOutMediumFactory.java new file mode 100644 index 000000000000..7c0eb8bb59e5 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/writeout/LegacyTmpFileSegmentWriteOutMediumFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.writeout; + +import com.fasterxml.jackson.annotation.JsonCreator; + +import java.io.File; +import java.io.IOException; + +public final class LegacyTmpFileSegmentWriteOutMediumFactory implements SegmentWriteOutMediumFactory +{ + private static final LegacyTmpFileSegmentWriteOutMediumFactory INSTANCE = new LegacyTmpFileSegmentWriteOutMediumFactory(); + + @JsonCreator + public static LegacyTmpFileSegmentWriteOutMediumFactory instance() + { + return INSTANCE; + } + + private LegacyTmpFileSegmentWriteOutMediumFactory() + { + } + + @Override + public SegmentWriteOutMedium makeSegmentWriteOutMedium(File outDir) throws IOException + { + return new LegacyTmpFileSegmentWriteOutMedium(outDir); + } +} \ No newline at end of file diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/SegmentWriteOutMediumFactory.java b/processing/src/main/java/org/apache/druid/segment/writeout/SegmentWriteOutMediumFactory.java index ae241b3187be..3e34af17f397 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/SegmentWriteOutMediumFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/SegmentWriteOutMediumFactory.java @@ -30,6 +30,7 @@ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = TmpFileSegmentWriteOutMediumFactory.class) @JsonSubTypes(value = { @JsonSubTypes.Type(name = "tmpFile", value = TmpFileSegmentWriteOutMediumFactory.class), + @JsonSubTypes.Type(name = "legacyTmpFile", value = LegacyTmpFileSegmentWriteOutMediumFactory.class), @JsonSubTypes.Type(name = "offHeapMemory", value = OffHeapMemorySegmentWriteOutMediumFactory.class), @JsonSubTypes.Type(name = "onHeapMemory", value = OnHeapMemorySegmentWriteOutMediumFactory.class), }) @@ -39,6 +40,7 @@ static Set builtInFactories() { return ImmutableSet.of( TmpFileSegmentWriteOutMediumFactory.instance(), + LegacyTmpFileSegmentWriteOutMediumFactory.instance(), OffHeapMemorySegmentWriteOutMediumFactory.instance(), OnHeapMemorySegmentWriteOutMediumFactory.instance() ); diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java b/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java index 93cff5ab4363..58dc99198e3f 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java @@ -122,15 +122,15 @@ public Closer getCloser() @Override public void close() throws IOException { - log.info("Closing, files still open[%,d], filesBeingClosed[%,d], dir[%s]", filesCreated.get(), numLocallyCreated, dir); + log.debug("Closing, files still open[%,d], filesBeingClosed[%,d], dir[%s]", filesCreated.get(), numLocallyCreated, dir); filesCreated.set(filesCreated.get() - numLocallyCreated); numLocallyCreated = 0; closer.close(); - if (root) { - log.info("Size distribution of files:"); + if (root && log.isDebugEnabled()) { + log.debug("Size distribution of files:"); for (Map.Entry entry : sizeDistribution.entrySet()) { - log.info("%,15d => %,15d", entry.getKey(), entry.getValue()); + log.debug("%,15d => %,15d", entry.getKey(), entry.getValue()); } } } diff --git a/processing/src/test/java/org/apache/druid/segment/writeout/LegacyFileWriteOutBytesTest.java b/processing/src/test/java/org/apache/druid/segment/writeout/LegacyFileWriteOutBytesTest.java new file mode 100644 index 000000000000..f454cc6eb64c --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/writeout/LegacyFileWriteOutBytesTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.writeout; + +import org.apache.druid.java.util.common.IAE; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +public class LegacyFileWriteOutBytesTest +{ + private LegacyFileWriteOutBytes fileWriteOutBytes; + private FileChannel mockFileChannel; + private File mockFile; + + @Before + public void setUp() + { + mockFileChannel = EasyMock.mock(FileChannel.class); + mockFile = EasyMock.mock(File.class); + fileWriteOutBytes = new LegacyFileWriteOutBytes(mockFile, mockFileChannel); + } + + @Test + public void write4KiBIntsShouldNotFlush() throws IOException + { + // Write 4KiB of ints and expect the write operation of the file channel will be triggered only once. + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + .andAnswer(() -> { + ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0]; + int remaining = buffer.remaining(); + buffer.position(remaining); + return remaining; + }).times(1); + EasyMock.replay(mockFileChannel); + final int writeBytes = 4096; + final int numOfInt = writeBytes / Integer.BYTES; + for (int i = 0; i < numOfInt; i++) { + fileWriteOutBytes.writeInt(i); + } + // no need to flush up to 4KiB + // the first byte after 4KiB will cause a flush + fileWriteOutBytes.write(1); + EasyMock.verify(mockFileChannel); + } + + @Test + public void writeShouldIncrementSize() throws IOException + { + fileWriteOutBytes.write(1); + Assert.assertEquals(1, fileWriteOutBytes.size()); + } + + @Test + public void writeIntShouldIncrementSize() throws IOException + { + fileWriteOutBytes.writeInt(1); + Assert.assertEquals(4, fileWriteOutBytes.size()); + } + + @Test + public void writeBufferLargerThanCapacityShouldIncrementSizeCorrectly() throws IOException + { + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + .andAnswer(() -> { + ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0]; + int remaining = buffer.remaining(); + buffer.position(remaining); + return remaining; + }).times(1); + EasyMock.replay(mockFileChannel); + ByteBuffer src = ByteBuffer.allocate(4096 + 1); + fileWriteOutBytes.write(src); + Assert.assertEquals(src.capacity(), fileWriteOutBytes.size()); + EasyMock.verify(mockFileChannel); + } + + @Test + public void writeBufferLargerThanCapacityThrowsIOEInTheMiddleShouldIncrementSizeCorrectly() throws IOException + { + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + .andAnswer(() -> { + ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0]; + int remaining = buffer.remaining(); + buffer.position(remaining); + return remaining; + }).once(); + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + .andThrow(new IOException()) + .once(); + EasyMock.replay(mockFileChannel); + ByteBuffer src = ByteBuffer.allocate(4096 * 2 + 1); + try { + fileWriteOutBytes.write(src); + Assert.fail("IOException should have been thrown."); + } + catch (IOException e) { + // The second invocation to flush bytes fails. So the size should count what has already been put successfully + Assert.assertEquals(4096 * 2, fileWriteOutBytes.size()); + } + } + + @Test + public void writeBufferSmallerThanCapacityShouldIncrementSizeCorrectly() throws IOException + { + ByteBuffer src = ByteBuffer.allocate(4096); + fileWriteOutBytes.write(src); + Assert.assertEquals(src.capacity(), fileWriteOutBytes.size()); + } + @Test + public void sizeDoesNotFlush() throws IOException + { + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + .andThrow(new AssertionError("file channel should not have been written to.")); + EasyMock.replay(mockFileChannel); + long size = fileWriteOutBytes.size(); + Assert.assertEquals(0, size); + fileWriteOutBytes.writeInt(10); + size = fileWriteOutBytes.size(); + Assert.assertEquals(4, size); + } + + @Test + public void testReadFullyWorks() throws IOException + { + int fileSize = 4096; + int numOfInt = fileSize / Integer.BYTES; + ByteBuffer destination = ByteBuffer.allocate(Integer.BYTES); + ByteBuffer underlying = ByteBuffer.allocate(fileSize); + // Write 4KiB of ints and expect the write operation of the file channel will be triggered only once. + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + .andAnswer(() -> { + ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0]; + underlying.position(0); + underlying.put(buffer); + return 0; + }).times(1); + EasyMock.expect(mockFileChannel.read(EasyMock.eq(destination), EasyMock.eq(100L * Integer.BYTES))) + .andAnswer(() -> { + ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0]; + long pos = (long) EasyMock.getCurrentArguments()[1]; + buffer.putInt(underlying.getInt((int) pos)); + return Integer.BYTES; + }).times(1); + EasyMock.replay(mockFileChannel); + for (int i = 0; i < numOfInt; i++) { + fileWriteOutBytes.writeInt(i); + } + Assert.assertEquals(underlying.capacity(), fileWriteOutBytes.size()); + + destination.position(0); + fileWriteOutBytes.readFully(100L * Integer.BYTES, destination); + destination.position(0); + Assert.assertEquals(100, destination.getInt()); + EasyMock.verify(mockFileChannel); + } + + @Test + public void testReadFullyOutOfBoundsDoesnt() throws IOException + { + int fileSize = 4096; + int numOfInt = fileSize / Integer.BYTES; + ByteBuffer destination = ByteBuffer.allocate(Integer.BYTES); + EasyMock.replay(mockFileChannel); + for (int i = 0; i < numOfInt; i++) { + fileWriteOutBytes.writeInt(i); + } + Assert.assertEquals(fileSize, fileWriteOutBytes.size()); + + destination.position(0); + Assert.assertThrows(IAE.class, () -> fileWriteOutBytes.readFully(5000, destination)); + EasyMock.verify(mockFileChannel); + } + + @Test + public void testIOExceptionHasFileInfo() throws Exception + { + IOException cause = new IOException("Too many bytes"); + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))).andThrow(cause); + EasyMock.expect(mockFile.getAbsolutePath()).andReturn("/tmp/file"); + EasyMock.replay(mockFileChannel, mockFile); + fileWriteOutBytes.writeInt(10); + fileWriteOutBytes.write(new byte[30]); + IOException actual = Assert.assertThrows(IOException.class, () -> fileWriteOutBytes.flush()); + Assert.assertEquals(String.valueOf(actual.getCause()), actual.getCause(), cause); + Assert.assertEquals(actual.getMessage(), actual.getMessage(), "Failed to write to file: /tmp/file. Current size of file: 34"); + } +} \ No newline at end of file From d12b9b7d79e138fd97947a628ece8f9c6c2f1e65 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Fri, 9 Aug 2024 15:33:36 +0530 Subject: [PATCH 10/10] Fix checkstyle --- .../apache/druid/segment/writeout/LegacyFileWriteOutBytes.java | 2 +- .../segment/writeout/LegacyTmpFileSegmentWriteOutMedium.java | 2 +- .../writeout/LegacyTmpFileSegmentWriteOutMediumFactory.java | 2 +- .../druid/segment/writeout/LegacyFileWriteOutBytesTest.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/LegacyFileWriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/LegacyFileWriteOutBytes.java index 71f2e589ec1e..192aa9b61c3f 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/LegacyFileWriteOutBytes.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/LegacyFileWriteOutBytes.java @@ -159,4 +159,4 @@ public boolean isOpen() { return ch.isOpen(); } -} \ No newline at end of file +} diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/LegacyTmpFileSegmentWriteOutMedium.java b/processing/src/main/java/org/apache/druid/segment/writeout/LegacyTmpFileSegmentWriteOutMedium.java index 55e6b5ff0bc1..a01280bf18df 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/LegacyTmpFileSegmentWriteOutMedium.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/LegacyTmpFileSegmentWriteOutMedium.java @@ -73,4 +73,4 @@ public void close() throws IOException { closer.close(); } -} \ No newline at end of file +} diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/LegacyTmpFileSegmentWriteOutMediumFactory.java b/processing/src/main/java/org/apache/druid/segment/writeout/LegacyTmpFileSegmentWriteOutMediumFactory.java index 7c0eb8bb59e5..c89627401dc9 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/LegacyTmpFileSegmentWriteOutMediumFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/LegacyTmpFileSegmentWriteOutMediumFactory.java @@ -43,4 +43,4 @@ public SegmentWriteOutMedium makeSegmentWriteOutMedium(File outDir) throws IOExc { return new LegacyTmpFileSegmentWriteOutMedium(outDir); } -} \ No newline at end of file +} diff --git a/processing/src/test/java/org/apache/druid/segment/writeout/LegacyFileWriteOutBytesTest.java b/processing/src/test/java/org/apache/druid/segment/writeout/LegacyFileWriteOutBytesTest.java index f454cc6eb64c..1ab2f5f785bd 100644 --- a/processing/src/test/java/org/apache/druid/segment/writeout/LegacyFileWriteOutBytesTest.java +++ b/processing/src/test/java/org/apache/druid/segment/writeout/LegacyFileWriteOutBytesTest.java @@ -208,4 +208,4 @@ public void testIOExceptionHasFileInfo() throws Exception Assert.assertEquals(String.valueOf(actual.getCause()), actual.getCause(), cause); Assert.assertEquals(actual.getMessage(), actual.getMessage(), "Failed to write to file: /tmp/file. Current size of file: 34"); } -} \ No newline at end of file +}