Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertSame;
Expand All @@ -47,6 +48,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Supplier;
import org.apache.bookkeeper.common.util.OrderedExecutor;
Expand Down Expand Up @@ -360,7 +362,8 @@ public void testAddRemoveConsumer() throws Exception {
// 4. Verify active consumer
assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName());
// get the notified with who is the leader
change = consumerChanges.take();
change = consumerChanges.poll(10, TimeUnit.SECONDS);
assertNotNull(change);
verifyActiveConsumerChange(change, 1, true);
verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1));

Expand All @@ -372,7 +375,8 @@ public void testAddRemoveConsumer() throws Exception {
assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName());
assertEquals(3, consumers.size());
// get notified with who is the leader
change = consumerChanges.take();
change = consumerChanges.poll(10, TimeUnit.SECONDS);
assertNotNull(change);
verifyActiveConsumerChange(change, 2, false);
verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1));
verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer1));
Expand All @@ -387,13 +391,17 @@ public void testAddRemoveConsumer() throws Exception {
assertEquals(4, consumers.size());

// all consumers will receive notifications
change = consumerChanges.take();
change = consumerChanges.poll(10, TimeUnit.SECONDS);
assertNotNull(change);
verifyActiveConsumerChange(change, 0, true);
change = consumerChanges.take();
change = consumerChanges.poll(10, TimeUnit.SECONDS);
assertNotNull(change);
verifyActiveConsumerChange(change, 1, false);
change = consumerChanges.take();
change = consumerChanges.poll(10, TimeUnit.SECONDS);
assertNotNull(change);
verifyActiveConsumerChange(change, 1, false);
change = consumerChanges.take();
change = consumerChanges.poll(10, TimeUnit.SECONDS);
assertNotNull(change);
verifyActiveConsumerChange(change, 2, false);
verify(consumer0, times(1)).notifyActiveConsumerChange(same(consumer0));
verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1));
Expand All @@ -419,9 +427,11 @@ public void testAddRemoveConsumer() throws Exception {
assertEquals(2, consumers.size());

// the remaining consumers will receive notifications
change = consumerChanges.take();
change = consumerChanges.poll(10, TimeUnit.SECONDS);
assertNotNull(change);
verifyActiveConsumerChange(change, 1, true);
change = consumerChanges.take();
change = consumerChanges.poll(10, TimeUnit.SECONDS);
assertNotNull(change);
verifyActiveConsumerChange(change, 1, true);

// 10. Attempt to remove already removed consumer
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.util.collections;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.concurrent.NotThreadSafe;
import lombok.Getter;

@NotThreadSafe
public class SegmentedLongArray implements AutoCloseable {

private static final int SIZE_OF_LONG = 8;

private static final int MAX_SEGMENT_SIZE = 2 * 1024 * 1024; // 2M longs -> 16 MB
private final List<ByteBuf> buffers = new ArrayList<>();

@Getter
private final long initialCapacity;

@Getter
private long capacity;

public SegmentedLongArray(long initialCapacity) {
long remainingToAdd = initialCapacity;

// Add first segment
int sizeToAdd = (int) Math.min(remainingToAdd, MAX_SEGMENT_SIZE);
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(sizeToAdd * SIZE_OF_LONG);
buffer.writerIndex(sizeToAdd * SIZE_OF_LONG);
buffers.add(buffer);
remainingToAdd -= sizeToAdd;

// Add the remaining segments, all at full segment size, if necessary
while (remainingToAdd > 0) {
buffer = PooledByteBufAllocator.DEFAULT.directBuffer(MAX_SEGMENT_SIZE * SIZE_OF_LONG);
buffer.writerIndex(MAX_SEGMENT_SIZE * SIZE_OF_LONG);
buffers.add(buffer);
remainingToAdd -= MAX_SEGMENT_SIZE;
}

this.initialCapacity = initialCapacity;
this.capacity = this.initialCapacity;
}

public void writeLong(long offset, long value) {
int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE);
int internalIdx = (int) (offset % MAX_SEGMENT_SIZE);
buffers.get(bufferIdx).setLong(internalIdx * SIZE_OF_LONG, value);
}

public long readLong(long offset) {
int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE);
int internalIdx = (int) (offset % MAX_SEGMENT_SIZE);
return buffers.get(bufferIdx).getLong(internalIdx * SIZE_OF_LONG);
}

public void increaseCapacity() {
if (capacity < MAX_SEGMENT_SIZE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The size of the last segment might also < MAX_SEGMENT_SIZE, we need to consider to update the capacity of the buffer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the last segment size < MAX_SEGMENT_SIZE will also affect the writeLong and readLong method because we always uses the MAX_SEGMENT_SIZE to calculate the index.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the segments, after the first one, will be created directly at MAX_SEGMENT_SIZE and will not get expanded/shrinked, only added or removed.

Copy link
Contributor

@codelipenghui codelipenghui Jul 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that's true! I need to fix the constructor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

// Resize the current buffer to bigger capacity
capacity += (capacity <= 256 ? capacity : capacity / 2);
capacity = Math.min(capacity, MAX_SEGMENT_SIZE);
buffers.get(0).capacity((int) this.capacity * SIZE_OF_LONG);
buffers.get(0).writerIndex((int) this.capacity * SIZE_OF_LONG);
} else {
// Let's add 1 mode buffer to the list
int bufferSize = MAX_SEGMENT_SIZE * SIZE_OF_LONG;
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(bufferSize, bufferSize);
buffer.writerIndex(bufferSize);
buffers.add(buffer);
capacity += MAX_SEGMENT_SIZE;
}
}

public void shrink(long newCapacity) {
if (newCapacity >= capacity || newCapacity < initialCapacity) {
return;
}

long sizeToReduce = capacity - newCapacity;
while (sizeToReduce >= MAX_SEGMENT_SIZE && buffers.size() > 1) {
ByteBuf b = buffers.remove(buffers.size() - 1);
b.release();
capacity -= MAX_SEGMENT_SIZE;
sizeToReduce -= MAX_SEGMENT_SIZE;
}

if (buffers.size() == 1 && sizeToReduce > 0) {
// We should also reduce the capacity of the first buffer
capacity -= sizeToReduce;
ByteBuf oldBuffer = buffers.get(0);
ByteBuf newBuffer = PooledByteBufAllocator.DEFAULT.directBuffer((int) capacity * SIZE_OF_LONG);
oldBuffer.getBytes(0, newBuffer, (int) capacity * SIZE_OF_LONG);
oldBuffer.release();
buffers.set(0, newBuffer);
}
}

@Override
public void close() {
buffers.forEach(ByteBuf::release);
}

/**
* The amount of memory used to back the array of longs.
*/
public long bytesCapacity() {
return capacity * SIZE_OF_LONG;
}
}
Loading