Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1848,7 +1848,7 @@ public void testSelectWithWikipediaEmptyOverWithCustomContext(String contextName
.setSql(
"select cityName, added, SUM(added) OVER () cc from wikipedia")
.setQueryContext(customContext)
.setExpectedMSQFault(new TooManyRowsInAWindowFault(15921, 200))
.setExpectedMSQFault(new TooManyRowsInAWindowFault(15922, 200))
.verifyResults();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ public class AppendableMemory implements Closeable
// One holder for every Memory we've allocated.
private final List<ResourceHolder<WritableMemory>> blockHolders = new ArrayList<>();

// The amount of space that has been used from each Memory block. Same length as "memoryHolders".
// The amount of space that has been used from each Memory block. Same length as "blockHolders".
private final IntList limits = new IntArrayList();

// The global starting position for each Memory block (blockNumber -> position). Same length as "memoryHolders".
// The global starting position for each Memory block (blockNumber -> position). Same length as "blockHolders".
private final LongArrayList globalStartPositions = new LongArrayList();

// Whether the blocks we've allocated are "packed"; meaning all non-final block limits equal the allocationSize.
Expand Down Expand Up @@ -104,6 +104,36 @@ public MemoryRange<WritableMemory> cursor()
return cursor;
}

/**
* Maximum number that can be successfully passed to {@link #reserveAdditional(int)}.
*/
public int availableToReserve()
{
final int currentBlockIdx = currentBlockNumber();
final long availableInCurrentBlock;
final boolean currentBlockIsEmpty;

if (currentBlockIdx < 0) {
availableInCurrentBlock = 0;
currentBlockIsEmpty = false;
} else {
final int usedInCurrentBlock = limits.getInt(currentBlockIdx);
availableInCurrentBlock = blockHolders.get(currentBlockIdx).get().getCapacity() - usedInCurrentBlock;
currentBlockIsEmpty = usedInCurrentBlock == 0;
}

// If currentBlockIsEmpty, add availableInCurrentBlock to account for reclamation in reclaimLastBlockIfEmpty().
final long availableInAllocator = allocator.available() + (currentBlockIsEmpty ? availableInCurrentBlock : 0);

return (int) Math.min(
Integer.MAX_VALUE,
Math.max(
availableInAllocator,
availableInCurrentBlock
)
);
}

/**
* Ensure that at least "bytes" amount of space is available after the cursor. Allocates a new block if needed.
* Note: the amount of bytes is guaranteed to be in a *single* block.
Expand All @@ -126,11 +156,13 @@ public boolean reserveAdditional(final int bytes)
return true;
}

releaseLastBlockIfEmpty();

if (bytes > allocator.available()) {
return false;
}

final int idx = blockHolders.size() - 1;
final int idx = currentBlockNumber();

if (idx < 0 || bytes + limits.getInt(idx) > blockHolders.get(idx).get().getCapacity()) {
// Allocation needed.
Expand Down Expand Up @@ -228,6 +260,9 @@ public void rewindCursor(final int bytes)
cursor.set(currentBlockMemory, newLimit, currentBlockMemory.getCapacity() - newLimit);
}

/**
* Current used size, in bytes.
*/
public long size()
{
long sz = 0;
Expand Down Expand Up @@ -295,12 +330,21 @@ private void addBlock(final ResourceHolder<WritableMemory> block)
cursor.set(blockMemory, 0, blockMemory.getCapacity());
}

private int currentBlockNumber()
private void releaseLastBlockIfEmpty()
{
if (blockHolders.isEmpty()) {
return NO_BLOCK;
} else {
return blockHolders.size() - 1;
final int lastBlockNumber = currentBlockNumber();
if (lastBlockNumber != NO_BLOCK && limits.getInt(lastBlockNumber) == 0) {
blockHolders.remove(lastBlockNumber).close();
limits.removeInt(lastBlockNumber);
}
}

/**
* Returns the index into {@link #blockHolders} and {@link #limits} of the current block, or {@link #NO_BLOCK}
* if there are no blocks.
*/
private int currentBlockNumber()
{
return blockHolders.size() - 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class ArenaMemoryAllocator implements MemoryAllocator
private final WritableMemory arena;
private long allocations = 0;
private long position = 0;
private WritableMemory lastAllocation;

private ArenaMemoryAllocator(WritableMemory arena)
{
Expand Down Expand Up @@ -64,20 +65,23 @@ public static ArenaMemoryAllocator createOnHeap(final int capacity)
@Override
public Optional<ResourceHolder<WritableMemory>> allocate(final long size)
{
if (position + size < arena.getCapacity()) {
if (position + size <= arena.getCapacity()) {
final long start = position;
allocations++;
position += size;

final WritableMemory memory = arena.writableRegion(start, size, ByteOrder.LITTLE_ENDIAN);
lastAllocation = memory;

return Optional.of(
new ResourceHolder<WritableMemory>()
{
private WritableMemory memory = arena.writableRegion(start, size, ByteOrder.LITTLE_ENDIAN);
boolean closed;

@Override
public WritableMemory get()
{
if (memory == null) {
if (closed) {
throw new ISE("Already closed");
}

Expand All @@ -87,10 +91,21 @@ public WritableMemory get()
@Override
public void close()
{
memory = null;
if (closed) {
return;
}

closed = true;

//noinspection ObjectEquality
if (memory == lastAllocation) {
// Last allocation closed; decrement position to enable partial arena reuse.
position -= memory.getCapacity();
lastAllocation = null;
}

if (--allocations == 0) {
// All allocations closed; reset position to enable arena reuse.
// All allocations closed; reset position to enable full arena reuse.
position = 0;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ public WritableMemory get()
@Override
public void close()
{
memory = null;
bytesAllocated -= size;
if (memory != null) {
memory = null;
bytesAllocated -= size;
}
}
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.primitives.Ints;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.AppendableMemory;
Expand Down Expand Up @@ -313,10 +314,22 @@ private boolean writeDataUsingFieldWriters()
// Reset to beginning of loop.
i = -1;

final int priorAllocation = BASE_DATA_ALLOCATION_SIZE * reserveMultiple;

// Try again with a bigger allocation.
reserveMultiple *= 2;

if (!dataMemory.reserveAdditional(Ints.checkedCast((long) BASE_DATA_ALLOCATION_SIZE * reserveMultiple))) {
final int nextAllocation = Math.min(
dataMemory.availableToReserve(),
Ints.checkedCast((long) BASE_DATA_ALLOCATION_SIZE * reserveMultiple)
);

if (nextAllocation > priorAllocation) {
if (!dataMemory.reserveAdditional(nextAllocation)) {
// Shouldn't see this unless availableToReserve lied to us.
throw DruidException.defensive("Unexpected failure of dataMemory.reserveAdditional");
}
} else {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.frame.allocation;

public class ArenaMemoryAllocatorTest extends BaseMemoryAllocatorTest
{
@Override
protected MemoryAllocator makeAllocator(int capacity)
{
return ArenaMemoryAllocator.createOnHeap(capacity);
}
}
Loading