Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.java.util.common;

import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.utils.JvmUtils;

import javax.annotation.Nullable;
Expand All @@ -39,8 +38,6 @@
*/
public class ByteBufferUtils
{
private static final Logger log = new Logger(ByteBufferUtils.class);

// the following MethodHandle lookup code is adapted from Apache Kafka
// https://github.com/apache/kafka/blob/e554dc518eaaa0747899e708160275f95c4e525f/clients/src/main/java/org/apache/kafka/common/utils/MappedByteBuffers.java

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
Expand Down Expand Up @@ -164,13 +165,70 @@ public void addFile(File file)
*
* @return a {@link MappedByteBufferHandler}, wrapping a read-only buffer reflecting {@code file}
*
* @throws FileNotFoundException if the {@code file} does not exist
* @throws IOException if an I/O error occurs
* @throws FileNotFoundException if the {@code file} does not exist
* @throws IOException if an I/O error occurs
* @throws IllegalArgumentException if length is greater than {@link Integer#MAX_VALUE}
* @see FileChannel#map(FileChannel.MapMode, long, long)
*/
public static MappedByteBufferHandler map(File file) throws IOException
{
MappedByteBuffer mappedByteBuffer = com.google.common.io.Files.map(file);
return map(file, 0, file.length());
}

/**
* Fully maps a file read-only in to memory as per
* {@link FileChannel#map(FileChannel.MapMode, long, long)}.
*
* @param file the file to map
* @param offset starting offset for the mmap
* @param length length for the mmap
*
* @return a {@link MappedByteBufferHandler}, wrapping a read-only buffer reflecting {@code file}
*
* @throws FileNotFoundException if the {@code file} does not exist
* @throws IOException if an I/O error occurs
* @throws IllegalArgumentException if length is greater than {@link Integer#MAX_VALUE}
* @see FileChannel#map(FileChannel.MapMode, long, long)
*/
public static MappedByteBufferHandler map(File file, long offset, long length) throws IOException
{
if (length > Integer.MAX_VALUE) {
throw new IAE("Cannot map region larger than %,d bytes", Integer.MAX_VALUE);
}

try (final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");
final FileChannel channel = randomAccessFile.getChannel()) {
final MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit should we use method at line 220 here ?

return new MappedByteBufferHandler(mappedByteBuffer);
}
}

/**
* Fully maps a file read-only in to memory as per
* {@link FileChannel#map(FileChannel.MapMode, long, long)}.
*
* @param randomAccessFile the file to map. The file will not be closed.
* @param offset starting offset for the mmap
* @param length length for the mmap
*
* @return a {@link MappedByteBufferHandler}, wrapping a read-only buffer reflecting {@code randomAccessFile}
*
* @throws IOException if an I/O error occurs
* @throws IllegalArgumentException if length is greater than {@link Integer#MAX_VALUE}
* @see FileChannel#map(FileChannel.MapMode, long, long)
*/
public static MappedByteBufferHandler map(
RandomAccessFile randomAccessFile,
long offset,
long length
) throws IOException
{
if (length > Integer.MAX_VALUE) {
throw new IAE("Cannot map region larger than %,d bytes", Integer.MAX_VALUE);
}

final FileChannel channel = randomAccessFile.getChannel();
final MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
return new MappedByteBufferHandler(mappedByteBuffer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@

package org.apache.druid.java.util.common;

import org.apache.druid.collections.ResourceHolder;

import java.nio.MappedByteBuffer;

/**
* Facilitates using try-with-resources with {@link MappedByteBuffer}s which don't implement {@link AutoCloseable}.
*
* <p>This interface is a specialization of {@code org.apache.druid.collections.ResourceHandler}.
* Facilitates using try-with-resources with {@link MappedByteBuffer}.
*
* @see FileUtils#map
*/
public final class MappedByteBufferHandler implements AutoCloseable
public final class MappedByteBufferHandler implements ResourceHolder<MappedByteBuffer>
{
private final MappedByteBuffer mappedByteBuffer;

Expand All @@ -40,6 +40,7 @@ public final class MappedByteBufferHandler implements AutoCloseable
/**
* Returns the wrapped buffer.
*/
@Override
public MappedByteBuffer get()
{
return mappedByteBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,38 @@ public void testMap() throws IOException
Assert.assertEquals(buffersMemoryBefore, buffersMemoryAfter);
}

@Test
public void testMapFileTooLarge() throws IOException
{
File dataFile = temporaryFolder.newFile("data");
try (RandomAccessFile raf = new RandomAccessFile(dataFile, "rw")) {
raf.write(42);
raf.setLength(1 << 20); // 1 MiB
}
final IllegalArgumentException e = Assert.assertThrows(
IllegalArgumentException.class,
() -> FileUtils.map(dataFile, 0, (long) Integer.MAX_VALUE + 1)
);
MatcherAssert.assertThat(e.getMessage(), CoreMatchers.containsString("Cannot map region larger than"));
}

@Test
public void testMapRandomAccessFileTooLarge() throws IOException
{
File dataFile = temporaryFolder.newFile("data");
try (RandomAccessFile raf = new RandomAccessFile(dataFile, "rw")) {
raf.write(42);
raf.setLength(1 << 20); // 1 MiB
}
try (RandomAccessFile raf = new RandomAccessFile(dataFile, "r")) {
final IllegalArgumentException e = Assert.assertThrows(
IllegalArgumentException.class,
() -> FileUtils.map(raf, 0, (long) Integer.MAX_VALUE + 1)
);
MatcherAssert.assertThat(e.getMessage(), CoreMatchers.containsString("Cannot map region larger than"));
}
}

@Test
public void testWriteAtomically() throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,15 @@

package org.apache.druid.frame.allocation;

import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.io.Channels;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -239,28 +234,6 @@ public long size()
return sz;
}

/**
* Write current memory to a channel.
*/
public void writeTo(final WritableByteChannel channel) throws IOException
{
for (int i = 0; i < blockHolders.size(); i++) {
final ResourceHolder<WritableMemory> memoryHolder = blockHolders.get(i);
final WritableMemory memory = memoryHolder.get();
final int limit = limits.getInt(i);

if (memory.hasByteBuffer()) {
final ByteBuffer byteBuffer = memory.getByteBuffer().duplicate();
byteBuffer.limit(Ints.checkedCast(memory.getRegionOffset(limit)));
byteBuffer.position(Ints.checkedCast(memory.getRegionOffset(0)));
Channels.writeFully(channel, byteBuffer);
} else {
// No implementation currently for Memory without backing ByteBuffer. (It's never needed.)
throw new UnsupportedOperationException("Cannot write Memory without backing ByteBuffer");
}
}
}

/**
* Write current memory to a {@link WritableMemory} buffer.
*/
Expand Down
Loading