From 2a6002aee9c23d3e318dfab1fa7a2a5d84a1721f Mon Sep 17 00:00:00 2001 From: conghuhu <56248584+conghuhu@users.noreply.github.com> Date: Thu, 26 Oct 2023 02:23:35 -0500 Subject: [PATCH 1/5] feat(WIP): add IntMapByDynamicHash (#2294) --- .../util/collection/IntMapByDynamicHash.java | 790 ++++++++++++++++++ .../unit/util/collection/IntMapTest.java | 88 +- .../map/MapRandomGetPutThroughputTest.java | 32 +- 3 files changed, 896 insertions(+), 14 deletions(-) create mode 100644 hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/collection/IntMapByDynamicHash.java diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/collection/IntMapByDynamicHash.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/collection/IntMapByDynamicHash.java new file mode 100644 index 0000000000..2849c63b42 --- /dev/null +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/collection/IntMapByDynamicHash.java @@ -0,0 +1,790 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.util.collection; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import sun.misc.Unsafe; + +public class IntMapByDynamicHash implements IntMap { + + private static final int DEFAULT_INITIAL_CAPACITY = 16; + + /** + * The maximum capacity, used if a higher value is implicitly specified + * by either of the constructors with arguments. + * MUST be a power of two <= 1<<30. + */ + private static final int MAXIMUM_CAPACITY = 1 << 30; + + private static final float LOAD_FACTOR = 0.75f; + + private static final int PARTITIONED_SIZE_THRESHOLD = 4096; + + private static final int NULL_VALUE = Integer.MIN_VALUE; + + private static final AtomicReferenceFieldUpdater + TABLE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(IntMapByDynamicHash.class, Entry[].class, + "table"); + + private volatile Entry[] table; + + /** + * Partition counting to improve the concurrency performance of addToSize() + */ + private int[] partitionedSize; + + private static final Entry RESIZING = new Entry(NULL_VALUE, NULL_VALUE, 1); + private static final Entry RESIZED = new Entry(NULL_VALUE, NULL_VALUE, 2); + + private static final Entry RESIZE_SENTINEL = new Entry(NULL_VALUE, NULL_VALUE, 3); + + /** + * must be 2^n - 1 + */ + private static final int SIZE_BUCKETS = 7; + + + /* ---------------- Table element access -------------- */ + private static Object tableAt(Object[] array, int index) { + return UNSAFE.getObjectVolatile(array, + ((long) index << ENTRY_ARRAY_SHIFT) + + ENTRY_ARRAY_BASE); + } + + private static boolean casTableAt(Object[] array, int index, Object expected, Object newValue) { + return UNSAFE.compareAndSwapObject( + array, + ((long) index << ENTRY_ARRAY_SHIFT) + ENTRY_ARRAY_BASE, + expected, + newValue); + } + + private static void setTableAt(Object[] array, int index, Object newValue) { + UNSAFE.putObjectVolatile(array, ((long) index << ENTRY_ARRAY_SHIFT) + ENTRY_ARRAY_BASE, + newValue); + } + + private static int tableSizeFor(int c) { + int n = c - 1; + n |= n >>> 1; + n |= n >>> 2; + n |= n >>> 4; + n |= n >>> 8; + n |= n >>> 16; + return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; + } + + @SuppressWarnings("UnusedDeclaration") + private volatile int size; // updated via atomic field updater + + public IntMapByDynamicHash() { + this(DEFAULT_INITIAL_CAPACITY); + } + + public IntMapByDynamicHash(int initialCapacity) { + if (initialCapacity < 0) { + throw new IllegalArgumentException("Illegal Initial Capacity: " + initialCapacity); + } + if (initialCapacity > MAXIMUM_CAPACITY) { + initialCapacity = MAXIMUM_CAPACITY; + } + long size = (long) (1.0 + (long) initialCapacity / LOAD_FACTOR); + int cap = (size >= (long) MAXIMUM_CAPACITY) ? + MAXIMUM_CAPACITY : tableSizeFor((int) size); + if (cap >= PARTITIONED_SIZE_THRESHOLD) { + /* + we want 7 extra slots and 64 bytes for each + slot. int is 4 bytes, so 64 bytes is 16 ints. + */ + this.partitionedSize = + new int[SIZE_BUCKETS * 16]; + } + // The end index is for resizeContainer + this.table = new Entry[cap + 1]; + } + + @Override + public boolean put(int key, int value) { + int hash = this.hash(key); + Entry[] currentArray = this.table; + Entry o = (Entry) IntMapByDynamicHash.tableAt(currentArray, hash); + if (o == null) { + Entry newEntry = new Entry(key, value); + this.addToSize(1); + if (IntMapByDynamicHash.casTableAt(currentArray, hash, null, newEntry)) { + return true; + } + this.addToSize(-1); + } + + this.slowPut(key, value, currentArray); + return true; + } + + private int slowPut(int key, int value, Entry[] currentTable) { + int length; + int index; + Entry o; + + while (true) { + length = currentTable.length; + index = this.hash(key, length); + o = (Entry) IntMapByDynamicHash.tableAt(currentTable, index); + + if (o == RESIZED || o == RESIZING) { + currentTable = this.helpWithResizeWhileCurrentIndex(currentTable, index); + } else { + Entry e = o; + boolean found = false; + + // Search for the key in the chain + while (e != null) { + int candidate = e.getKey(); + if (candidate == key) { + found = true; + break; + } + e = e.getNext(); + } + + if (found) { + int oldVal = e.getValue(); + // Key found, replace the entry + Entry newEntry = + new Entry(key, value, this.createReplacementChainForRemoval(o, e)); + if (IntMapByDynamicHash.casTableAt(currentTable, index, o, newEntry)) { + return oldVal; + } + } else { + // Key not found, add a new entry + Entry newEntry = new Entry(key, value, o); + if (IntMapByDynamicHash.casTableAt(currentTable, index, o, newEntry)) { + this.incrementSizeAndPossiblyResize(currentTable, length, o); + return NULL_VALUE; + } + } + } + } + } + + @Override + public int get(int key) { + int hash = this.hash(key); + Entry[] currentArray = this.table; + Entry o = (Entry) IntMapByDynamicHash.tableAt(currentArray, hash); + if (o == RESIZED || o == RESIZING) { + return this.slowGet(key, currentArray); + } + for (Entry e = o; e != null; e = e.getNext()) { + int k; + if ((k = e.getKey()) == key || key == k) { + return e.value; + } + } + return NULL_VALUE; + } + + private int slowGet(int key, Entry[] currentArray) { + while (true) { + int length = currentArray.length; + int hash = this.hash(key, length); + Entry o = (Entry) IntMapByDynamicHash.tableAt(currentArray, hash); + if (o == RESIZED || o == RESIZING) { + currentArray = this.helpWithResizeWhileCurrentIndex(currentArray, hash); + } else { + Entry e = o; + while (e != null) { + int candidate = e.getKey(); + if (candidate == key) { + return e.getValue(); + } + e = e.getNext(); + } + return NULL_VALUE; + } + } + } + + @Override + public boolean remove(int key) { + int hash = this.hash(key); + Entry[] currentTable = this.table; + Entry o = (Entry) IntMapByDynamicHash.tableAt(currentTable, hash); + if (o == RESIZED || o == RESIZING) { + return this.slowRemove(key, currentTable) != null; + } + + Entry e = o; + while (e != null) { + int candidate = e.getKey(); + if (candidate == key) { + Entry replacement = this.createReplacementChainForRemoval(o, e); + if (IntMapByDynamicHash.casTableAt(currentTable, hash, o, replacement)) { + this.addToSize(-1); + return true; + } + return this.slowRemove(key, currentTable) != null; + } + e = e.getNext(); + } + return false; + } + + private Entry slowRemove(int key, Entry[] currentTable) { + int length; + int index; + Entry o; + + while (true) { + length = currentTable.length; + index = this.hash(key, length); + o = (Entry) IntMapByDynamicHash.tableAt(currentTable, index); + if (o == RESIZED || o == RESIZING) { + currentTable = this.helpWithResizeWhileCurrentIndex(currentTable, index); + } else { + Entry e = o; + Entry prev = null; + + while (e != null) { + int candidate = e.getKey(); + if (candidate == key) { + Entry replacement = this.createReplacementChainForRemoval(o, e); + if (IntMapByDynamicHash.casTableAt(currentTable, index, o, replacement)) { + this.addToSize(-1); + return e; + } + // Key found, but CAS failed, restart the loop + break; + } + prev = e; + e = e.getNext(); + } + + if (prev != null) { + // Key not found + return null; + } + } + } + } + + @Override + public boolean containsKey(int key) { + return this.getEntry(key) != null; + } + + @Override + public IntIterator keys() { + // TODO impl + return null; + } + + @Override + public IntIterator values() { + // TODO impl + return null; + } + + @Override + public void clear() { + Entry[] currentArray = this.table; + ResizeContainer resizeContainer; + do { + resizeContainer = null; + for (int i = 0; i < currentArray.length - 1; i++) { + Entry o = (Entry) IntMapByDynamicHash.tableAt(currentArray, i); + if (o == RESIZED || o == RESIZING) { + resizeContainer = (ResizeContainer) IntMapByDynamicHash.tableAt(currentArray, + currentArray.length - + 1); + } else if (o != null) { + Entry e = o; + if (IntMapByDynamicHash.casTableAt(currentArray, i, o, null)) { + int removedEntries = 0; + while (e != null) { + removedEntries++; + e = e.getNext(); + } + this.addToSize(-removedEntries); + } + } + } + if (resizeContainer != null) { + if (resizeContainer.isNotDone()) { + this.helpWithResize(currentArray); + resizeContainer.waitForAllResizers(); + } + currentArray = resizeContainer.nextArray; + } + } while (resizeContainer != null); + } + + @Override + public int size() { + int localSize = this.size; + if (this.partitionedSize != null) { + for (int i = 0; i < SIZE_BUCKETS; i++) { + localSize += this.partitionedSize[i << 4]; + } + } + return localSize; + } + + @Override + public boolean concurrent() { + return true; + } + + private int hash(int key) { + return key & (table.length - 2); + } + + private int hash(int key, int length) { + return key & (length - 2); + } + + private Entry getEntry(int key) { + Entry[] currentArray = this.table; + while (true) { + int length = currentArray.length; + int index = this.hash(key, length); + Entry o = (Entry) IntMapByDynamicHash.tableAt(currentArray, index); + if (o == RESIZED || o == RESIZING) { + currentArray = this.helpWithResizeWhileCurrentIndex(currentArray, index); + } else { + Entry e = o; + while (e != null) { + int candidate = e.getKey(); + if (candidate == key) { + return e; + } + e = e.getNext(); + } + return null; + } + } + } + + private void addToSize(int value) { + if (this.partitionedSize != null) { + if (this.incrementPartitionedSize(value)) { + return; + } + } + this.incrementLocalSize(value); + } + + private boolean incrementPartitionedSize(int value) { + int h = (int) Thread.currentThread().getId(); + h ^= (h >>> 18) ^ (h >>> 12); + h = (h ^ (h >>> 10)) & SIZE_BUCKETS; + if (h != 0) { + h = (h - 1) << 4; + long address = ((long) h << INT_ARRAY_SHIFT) + INT_ARRAY_BASE; + while (true) { + int localSize = UNSAFE.getIntVolatile(this.partitionedSize, address); + if (UNSAFE.compareAndSwapInt(this.partitionedSize, address, localSize, + localSize + value)) { + return true; + } + } + } + return false; + } + + private void incrementLocalSize(int value) { + while (true) { + int localSize = this.size; + if (UNSAFE.compareAndSwapInt(this, SIZE_OFFSET, localSize, localSize + value)) { + break; + } + } + } + + private Entry createReplacementChainForRemoval(Entry original, + Entry toRemove) { + if (original == toRemove) { + return original.getNext(); + } + Entry replacement = null; + Entry e = original; + while (e != null) { + if (e != toRemove) { + replacement = new Entry(e.getKey(), e.getValue(), replacement); + } + e = e.getNext(); + } + return replacement; + } + + private void incrementSizeAndPossiblyResize(Entry[] currentArray, int length, Entry prev) { + this.addToSize(1); + if (prev != null) { + int localSize = this.size(); + int threshold = (int) (length * LOAD_FACTOR); // threshold = length * 0.75 + if (localSize + 1 > threshold) { + this.resize(currentArray); + } + } + } + + private Entry[] helpWithResizeWhileCurrentIndex(Entry[] currentArray, int index) { + Entry[] newArray = this.helpWithResize(currentArray); + int helpCount = 0; + while (IntMapByDynamicHash.tableAt(currentArray, index) != RESIZED) { + helpCount++; + newArray = this.helpWithResize(currentArray); + if ((helpCount & 7) == 0) { + Thread.yield(); + } + } + return newArray; + } + + private void resize(Entry[] oldTable) { + this.resize(oldTable, (oldTable.length - 1 << 1) + 1); + } + + // newSize must be a power of 2 + 1 + @SuppressWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER") + private void resize(Entry[] oldTable, int newSize) { + int oldCapacity = oldTable.length; + int end = oldCapacity - 1; + Entry last = (Entry) IntMapByDynamicHash.tableAt(oldTable, end); + if (this.size() < end && last == RESIZE_SENTINEL) { + return; + } + if (oldCapacity >= MAXIMUM_CAPACITY) { + throw new RuntimeException("max capacity of map exceeded"); + } + ResizeContainer resizeContainer = null; + // This ownResize records whether current thread need to perform the expansion operation of + // the map by itself + boolean ownResize = false; + if (last == null || last == RESIZE_SENTINEL) { + // allocating a new array is too expensive to make this an atomic operation + synchronized (oldTable) { + if (IntMapByDynamicHash.tableAt(oldTable, end) == null) { + IntMapByDynamicHash.setTableAt(oldTable, end, RESIZE_SENTINEL); + if (this.partitionedSize == null && newSize >= PARTITIONED_SIZE_THRESHOLD) { + this.partitionedSize = new int[SIZE_BUCKETS * 16]; + } + resizeContainer = new ResizeContainer(new Entry[newSize], oldTable.length - 1); + IntMapByDynamicHash.setTableAt(oldTable, end, resizeContainer); + ownResize = true; + } + } + } + if (ownResize) { + this.transfer(oldTable, resizeContainer); + + Entry[] src = this.table; + while (!TABLE_UPDATER.compareAndSet(this, oldTable, resizeContainer.nextArray)) { + /* + we're in a double resize situation; we'll have to go help until it's our turn + to set the table + */ + if (src != oldTable) { + this.helpWithResize(src); + } + } + } else { + this.helpWithResize(oldTable); + } + } + + /** + * Transfer all entries from src to dest tables + */ + private void transfer(Entry[] src, ResizeContainer resizeContainer) { + Entry[] dest = resizeContainer.nextArray; + + for (int j = 0; j < src.length - 1; ) { + Entry o = (Entry) IntMapByDynamicHash.tableAt(src, j); + if (o == null) { + if (IntMapByDynamicHash.casTableAt(src, j, null, RESIZED)) { + j++; + } + } else if (o == RESIZED || o == RESIZING) { + /* + During the expansion process, other threads have already migrated the elements at + this location to the new array. This means that the elements in the current + position have already been processed and do not need to be migrated again. + */ + j = (j & ~(ResizeContainer.QUEUE_INCREMENT - 1)) + ResizeContainer.QUEUE_INCREMENT; + /* + When there is only one thread for expansion, there is no concurrency issue + and there is no need to wait. + */ + if (resizeContainer.resizers.get() == 1) { + break; + } + } else { + Entry e = o; + if (IntMapByDynamicHash.casTableAt(src, j, o, RESIZING)) { + while (e != null) { + this.unconditionalCopy(dest, e); + e = e.getNext(); + } + IntMapByDynamicHash.setTableAt(src, j, RESIZED); + j++; + } + } + } + resizeContainer.decrementResizerAndNotify(); + resizeContainer.waitForAllResizers(); + } + + /** + * Enable the current thread to participate in the expansion + */ + private Entry[] helpWithResize(Entry[] currentArray) { + ResizeContainer resizeContainer = + (ResizeContainer) IntMapByDynamicHash.tableAt(currentArray, + currentArray.length - 1); + Entry[] newTable = resizeContainer.nextArray; + if (resizeContainer.getQueuePosition() > ResizeContainer.QUEUE_INCREMENT) { + resizeContainer.incrementResizer(); + this.reverseTransfer(currentArray, resizeContainer); + resizeContainer.decrementResizerAndNotify(); + } + return newTable; + } + + private void reverseTransfer(Entry[] src, ResizeContainer resizeContainer) { + Entry[] dest = resizeContainer.nextArray; + while (resizeContainer.getQueuePosition() > 0) { + int start = resizeContainer.subtractAndGetQueuePosition(); + int end = start + ResizeContainer.QUEUE_INCREMENT; + if (end > 0) { + if (start < 0) { + start = 0; + } + for (int j = end - 1; j >= start; ) { + Entry o = (Entry) IntMapByDynamicHash.tableAt(src, j); + if (o == null) { + if (IntMapByDynamicHash.casTableAt(src, j, null, RESIZED)) { + j--; + } + } else if (o == RESIZED || o == RESIZING) { + resizeContainer.zeroOutQueuePosition(); + return; + } else { + Entry e = o; + if (IntMapByDynamicHash.casTableAt(src, j, o, RESIZING)) { + while (e != null) { + this.unconditionalCopy(dest, e); + e = e.getNext(); + } + IntMapByDynamicHash.setTableAt(src, j, RESIZED); + j--; + } + } + } + } + } + } + + private void unconditionalCopy(Entry[] dest, Entry toCopyEntry) { + Entry[] currentArray = dest; + while (true) { + int length = currentArray.length; + int index = this.hash(toCopyEntry.getKey(), length); + Entry o = (Entry) IntMapByDynamicHash.tableAt(currentArray, index); + if (o == RESIZED || o == RESIZING) { + currentArray = ((ResizeContainer) IntMapByDynamicHash.tableAt(currentArray, + length - + 1)).nextArray; + } else { + Entry newEntry; + if (o == null) { + if (toCopyEntry.getNext() == null) { + newEntry = toCopyEntry; // no need to duplicate + } else { + newEntry = new Entry(toCopyEntry.getKey(), toCopyEntry.getValue()); + } + } else { + newEntry = + new Entry(toCopyEntry.getKey(), toCopyEntry.getValue(), o); + } + if (IntMapByDynamicHash.casTableAt(currentArray, index, o, newEntry)) { + return; + } + } + } + } + + private static final class ResizeContainer extends Entry { + private static final int QUEUE_INCREMENT = + Math.min(1 << 10, + Integer.highestOneBit(Runtime.getRuntime().availableProcessors()) << 4); + private final AtomicInteger resizers = new AtomicInteger(1); + private final Entry[] nextArray; + private final AtomicInteger queuePosition; + + private ResizeContainer(Entry[] nextArray, int oldSize) { + super(NULL_VALUE, NULL_VALUE, 4); + this.nextArray = nextArray; + this.queuePosition = new AtomicInteger(oldSize); + } + + public void incrementResizer() { + this.resizers.incrementAndGet(); + } + + public void decrementResizerAndNotify() { + int remaining = this.resizers.decrementAndGet(); + if (remaining == 0) { + synchronized (this) { + this.notifyAll(); + } + } + } + + public int getQueuePosition() { + return this.queuePosition.get(); + } + + public int subtractAndGetQueuePosition() { + return this.queuePosition.addAndGet(-QUEUE_INCREMENT); + } + + public void waitForAllResizers() { + if (this.resizers.get() > 0) { + for (int i = 0; i < 16; i++) { + if (this.resizers.get() == 0) { + break; + } + } + for (int i = 0; i < 16; i++) { + if (this.resizers.get() == 0) { + break; + } + Thread.yield(); + } + } + if (this.resizers.get() > 0) { + synchronized (this) { + while (this.resizers.get() > 0) { + try { + this.wait(); + } catch (InterruptedException e) { + //ginore + } + } + } + } + } + + public boolean isNotDone() { + return this.resizers.get() > 0; + } + + public void zeroOutQueuePosition() { + this.queuePosition.set(0); + } + } + + private static class Entry { + final int key; + volatile int value; + volatile Entry next; + + /** + * 0 NORMAL + * 1 RESIZING + * 2 RESIZED + * 3 RESIZE_SENTINEL + * 4 ResizeContainer + */ + final int state; + + public Entry(int key, int value, int state) { + this.key = key; + this.value = value; + this.state = state; + } + + public Entry(int key, int value) { + this.key = key; + this.value = value; + this.next = null; + this.state = 0; + } + + public Entry(int key, int value, Entry next) { + this.key = key; + this.value = value; + this.next = next; + this.state = 0; + } + + public int getKey() { + return key; + } + + public int getValue() { + return value; + } + + public Entry getNext() { + return next; + } + + @Override + public String toString() { + return this.key + "=" + this.value; + } + } + + /* ---------------- Unsafe mechanics -------------- */ + private static final Unsafe UNSAFE = IntSet.UNSAFE; + private static final long ENTRY_ARRAY_BASE; + private static final int ENTRY_ARRAY_SHIFT; + private static final long INT_ARRAY_BASE; + private static final int INT_ARRAY_SHIFT; + private static final long SIZE_OFFSET; + + static { + try { + Class tableClass = Entry[].class; + ENTRY_ARRAY_BASE = UNSAFE.arrayBaseOffset(tableClass); + int objectArrayScale = UNSAFE.arrayIndexScale(tableClass); + if ((objectArrayScale & (objectArrayScale - 1)) != 0) { + throw new AssertionError("data type scale not a power of two"); + } + ENTRY_ARRAY_SHIFT = 31 - Integer.numberOfLeadingZeros(objectArrayScale); + + Class intArrayClass = int[].class; + INT_ARRAY_BASE = UNSAFE.arrayBaseOffset(intArrayClass); + int intArrayScale = UNSAFE.arrayIndexScale(intArrayClass); + if ((intArrayScale & (intArrayScale - 1)) != 0) { + throw new AssertionError("data type scale not a power of two"); + } + INT_ARRAY_SHIFT = 31 - Integer.numberOfLeadingZeros(intArrayScale); + + Class mapClass = IntMapByDynamicHash.class; + SIZE_OFFSET = UNSAFE.objectFieldOffset(mapClass.getDeclaredField("size")); + } catch (NoSuchFieldException | SecurityException e) { + throw new AssertionError(e); + } + } +} diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/util/collection/IntMapTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/util/collection/IntMapTest.java index 4e2ca9c388..4d1dfe703d 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/util/collection/IntMapTest.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/util/collection/IntMapTest.java @@ -21,17 +21,23 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; -import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - import org.apache.hugegraph.testutil.Assert; import org.apache.hugegraph.unit.BaseUnitTest; import org.apache.hugegraph.util.collection.IntIterator; import org.apache.hugegraph.util.collection.IntMap; +import org.apache.hugegraph.util.collection.IntMapByDynamicHash; +import org.apache.hugegraph.util.collection.IntMapByDynamicHash2; +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; public class IntMapTest extends BaseUnitTest { @@ -412,6 +418,78 @@ public void testIntFixedMapBySegmentsValuesWithMultiSegs() { } } + @Test + public void testIntMapByDynamicHashSingleThread() { + IntMapByDynamicHash2 map = new IntMapByDynamicHash2(); + int mapSize = 2000; + for (int i = 0; i < mapSize; i++) { + map.put(i, i + 1); + Assert.assertTrue(map.containsKey(i)); + Assert.assertFalse(map.containsKey(i + mapSize)); + Assert.assertEquals(i + 1, map.get(i)); + } + + for (int i = mapSize - 1; i >= 0; i--) { + map.put(i, i - 1); + Assert.assertTrue(map.containsKey(i)); + Assert.assertFalse(map.containsKey(i + mapSize)); + Assert.assertEquals(i - 1, map.get(i)); + } + + Assert.assertEquals(mapSize, map.size()); + map.clear(); + Assert.assertEquals(0, map.size()); + } + + @Test + public void testIntMapByDynamicHashMultiThread() throws InterruptedException { + IntMapByDynamicHash2 map = new IntMapByDynamicHash2(); + + //int cpus = IntSet.CPUS; + int cpus = 16; + ThreadPoolExecutor executor = + new ThreadPoolExecutor(cpus, cpus, 1, TimeUnit.MINUTES, + new LinkedBlockingDeque<>()) { + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + if (t != null) { + Assert.fail(t.getMessage()); + } + } + }; + ; + + AtomicInteger size = new AtomicInteger(); + int mapSize = 100; + CountDownLatch latch = new CountDownLatch(cpus); + for (int i = 1; i <= cpus; i++) { + int index = i; + executor.submit(() -> { + try { + for (int j = 0; j < mapSize; j++) { + int key = j + (index - 1) * mapSize; + map.put(key, j); + size.getAndIncrement(); + //Assert.assertTrue(map.containsKey(key)); + Assert.assertEquals(j, map.get(key)); + //System.out.println(key + " " + j); + } + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } finally { + latch.countDown(); + } + }); + } + + latch.await(); + System.out.println(); + + Assert.assertEquals(size.get(), map.size()); + } + private IntMap fixed(int capacity) { return new IntMap.IntMapByFixedAddr(capacity); } diff --git a/hugegraph-server/hugegraph-test/src/test/java/org/apache/hugegraph/benchmark/map/MapRandomGetPutThroughputTest.java b/hugegraph-server/hugegraph-test/src/test/java/org/apache/hugegraph/benchmark/map/MapRandomGetPutThroughputTest.java index eafe4b861f..c1f04fa733 100644 --- a/hugegraph-server/hugegraph-test/src/test/java/org/apache/hugegraph/benchmark/map/MapRandomGetPutThroughputTest.java +++ b/hugegraph-server/hugegraph-test/src/test/java/org/apache/hugegraph/benchmark/map/MapRandomGetPutThroughputTest.java @@ -23,6 +23,7 @@ import org.apache.hugegraph.benchmark.BenchmarkConstants; import org.apache.hugegraph.benchmark.SimpleRandom; import org.apache.hugegraph.util.collection.IntMap; +import org.apache.hugegraph.util.collection.IntMapByDynamicHash; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -59,7 +60,9 @@ public class MapRandomGetPutThroughputTest { private IntMap.IntMapBySegments intMapBySegments; - private IntMap.IntMapByEcSegment intMapByEcSegments; + private IntMapByDynamicHash intMapByDynamicHashNonCap; + + private IntMapByDynamicHash intMapByDynamicHash; private static final int THREAD_COUNT = 8; @@ -70,7 +73,8 @@ public void prepareMap() { this.concurrentHashMapNonCap = new ConcurrentHashMap<>(); this.concurrentHashMap = new ConcurrentHashMap<>(MAP_CAPACITY); this.intMapBySegments = new IntMap.IntMapBySegments(MAP_CAPACITY); - this.intMapByEcSegments = new IntMap.IntMapByEcSegment(); + this.intMapByDynamicHashNonCap = new IntMapByDynamicHash(); + this.intMapByDynamicHash = new IntMapByDynamicHash(MAP_CAPACITY); } /** @@ -89,7 +93,7 @@ int next() { @Benchmark @Threads(THREAD_COUNT) public void randomGetPutOfConcurrentHashMapWithNoneInitCap(ThreadState state) { - int key = state.next() & (MAP_CAPACITY - 1); + int key = state.next(); if (!this.concurrentHashMapNonCap.containsKey(key)) { this.concurrentHashMapNonCap.put(key, state.next()); } @@ -99,7 +103,7 @@ public void randomGetPutOfConcurrentHashMapWithNoneInitCap(ThreadState state) { @Benchmark @Threads(THREAD_COUNT) public void randomGetPutOfConcurrentHashMapWithInitCap(ThreadState state) { - int key = state.next() & (MAP_CAPACITY - 1); + int key = state.next(); if (!this.concurrentHashMap.containsKey(key)) { this.concurrentHashMap.put(key, state.next()); } @@ -118,12 +122,22 @@ public void randomGetPutOfIntMapBySegments(ThreadState state) { @Benchmark @Threads(THREAD_COUNT) - public void randomGetPutOfIntMapByEcSegment(ThreadState state) { - int key = state.next() & (MAP_CAPACITY - 1); - if (!this.intMapByEcSegments.containsKey(key)) { - this.intMapByEcSegments.put(key, state.next()); + public void randomGetPutOfIntMapByDynamicHashWithNoneCap(ThreadState state) { + int key = state.next(); + if (!this.intMapByDynamicHashNonCap.containsKey(key)) { + this.intMapByDynamicHashNonCap.put(key, state.next()); + } + this.intMapByDynamicHashNonCap.get(key); + } + + @Benchmark + @Threads(THREAD_COUNT) + public void randomGetPutOfIntMapByDynamicHash(ThreadState state) { + int key = state.next(); + if (!this.intMapByDynamicHash.containsKey(key)) { + this.intMapByDynamicHash.put(key, state.next()); } - this.intMapByEcSegments.get(key); + this.intMapByDynamicHash.get(key); } public static void main(String[] args) throws RunnerException { From 44d03e8368c8c8e8b566391fbb11436905f168bb Mon Sep 17 00:00:00 2001 From: conghuhu Date: Tue, 5 Dec 2023 14:43:55 +0800 Subject: [PATCH 2/5] feat: add values & keys in IntMapByDynamicHash --- .../util/collection/IntMapByDynamicHash.java | 127 ++++++++++++++++-- .../unit/util/collection/IntMapTest.java | 33 ++++- .../benchmark/BenchmarkConstants.java | 2 +- .../map/MapRandomGetPutThroughputTest.java | 14 +- 4 files changed, 156 insertions(+), 20 deletions(-) diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/collection/IntMapByDynamicHash.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/collection/IntMapByDynamicHash.java index 2849c63b42..b233c24225 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/collection/IntMapByDynamicHash.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/collection/IntMapByDynamicHash.java @@ -17,6 +17,9 @@ package org.apache.hugegraph.util.collection; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -92,8 +95,11 @@ private static int tableSizeFor(int c) { return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; } + /** + * updated via atomic field updater + */ @SuppressWarnings("UnusedDeclaration") - private volatile int size; // updated via atomic field updater + private volatile int size; public IntMapByDynamicHash() { this(DEFAULT_INITIAL_CAPACITY); @@ -110,10 +116,8 @@ public IntMapByDynamicHash(int initialCapacity) { int cap = (size >= (long) MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int) size); if (cap >= PARTITIONED_SIZE_THRESHOLD) { - /* - we want 7 extra slots and 64 bytes for each - slot. int is 4 bytes, so 64 bytes is 16 ints. - */ + // we want 7 extra slots and 64 bytes for each + // slot. int is 4 bytes, so 64 bytes is 16 ints. this.partitionedSize = new int[SIZE_BUCKETS * 16]; } @@ -293,14 +297,12 @@ public boolean containsKey(int key) { @Override public IntIterator keys() { - // TODO impl - return null; + return new KeyIterator(); } @Override public IntIterator values() { - // TODO impl - return null; + return new ValueIterator(); } @Override @@ -755,6 +757,113 @@ public String toString() { } } + /* ---------------- Iterator -------------- */ + + private static final class IteratorState { + private Entry[] currentTable; + private int start; + private int end; + + private IteratorState(Entry[] currentTable) { + this.currentTable = currentTable; + this.end = this.currentTable.length - 1; + } + + private IteratorState(Entry[] currentTable, int start, int end) { + this.currentTable = currentTable; + this.start = start; + this.end = end; + } + } + + private abstract class HashIterator implements IntIterator { + private List todo; + private IteratorState currentState; + private Entry next; + private int index; + + protected HashIterator() { + this.currentState = new IteratorState(IntMapByDynamicHash.this.table); + this.findNext(); + } + + private void findNext() { + while (this.index < this.currentState.end) { + Entry o = + (Entry) IntMapByDynamicHash.tableAt(this.currentState.currentTable, this.index); + if (o == RESIZED || o == RESIZING) { + Entry[] nextArray = + IntMapByDynamicHash.this.helpWithResizeWhileCurrentIndex( + this.currentState.currentTable, this.index); + int endResized = this.index + 1; + while (endResized < this.currentState.end) { + if (IntMapByDynamicHash.tableAt(this.currentState.currentTable, + endResized) != RESIZED) { + break; + } + endResized++; + } + if (this.todo == null) { + this.todo = new ArrayList<>(4); + } + if (endResized < this.currentState.end) { + this.todo.add(new IteratorState( + this.currentState.currentTable, endResized, this.currentState.end)); + } + int powerTwoLength = this.currentState.currentTable.length - 1; + this.todo.add(new IteratorState(nextArray, this.index + powerTwoLength, + endResized + powerTwoLength)); + this.currentState.currentTable = nextArray; + this.currentState.end = endResized; + this.currentState.start = this.index; + } else if (o != null) { + this.next = o; + this.index++; + break; + } else { + this.index++; + } + } + if (this.next == null && this.index == this.currentState.end && this.todo != null && + !this.todo.isEmpty()) { + this.currentState = this.todo.remove(this.todo.size() - 1); + this.index = this.currentState.start; + this.findNext(); + } + } + + @Override + public final boolean hasNext() { + return this.next != null; + } + + final Entry nextEntry() { + Entry e = this.next; + if (e == null) { + throw new NoSuchElementException(); + } + + if ((this.next = e.getNext()) == null) { + this.findNext(); + } + return e; + } + } + + private final class ValueIterator extends HashIterator { + @Override + public int next() { + return this.nextEntry().getValue(); + } + } + + private final class KeyIterator extends HashIterator { + @Override + public int next() { + return this.nextEntry().getKey(); + } + } + /* ---------------- Unsafe mechanics -------------- */ private static final Unsafe UNSAFE = IntSet.UNSAFE; private static final long ENTRY_ARRAY_BASE; diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/util/collection/IntMapTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/util/collection/IntMapTest.java index 4d1dfe703d..29755bc718 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/util/collection/IntMapTest.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/util/collection/IntMapTest.java @@ -33,7 +33,6 @@ import org.apache.hugegraph.util.collection.IntIterator; import org.apache.hugegraph.util.collection.IntMap; import org.apache.hugegraph.util.collection.IntMapByDynamicHash; -import org.apache.hugegraph.util.collection.IntMapByDynamicHash2; import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; import org.junit.After; import org.junit.Before; @@ -420,7 +419,7 @@ public void testIntFixedMapBySegmentsValuesWithMultiSegs() { @Test public void testIntMapByDynamicHashSingleThread() { - IntMapByDynamicHash2 map = new IntMapByDynamicHash2(); + IntMap map = new IntMapByDynamicHash(); int mapSize = 2000; for (int i = 0; i < mapSize; i++) { map.put(i, i + 1); @@ -443,7 +442,7 @@ public void testIntMapByDynamicHashSingleThread() { @Test public void testIntMapByDynamicHashMultiThread() throws InterruptedException { - IntMapByDynamicHash2 map = new IntMapByDynamicHash2(); + IntMap map = new IntMapByDynamicHash(); //int cpus = IntSet.CPUS; int cpus = 16; @@ -490,6 +489,34 @@ protected void afterExecute(Runnable r, Throwable t) { Assert.assertEquals(size.get(), map.size()); } + @Test + public void testIntMapByDynamicHashKeys() { + IntMap map = new IntMapByDynamicHash(); + for (int i = 0; i < 10000; i++) { + map.put(i, i + 100); + } + IntIterator iterator = map.keys(); + for (int i = 0; i < 10000; i++) { + Assert.assertTrue(iterator.hasNext()); + Assert.assertEquals(i, iterator.next()); + } + Assert.assertFalse(iterator.hasNext()); + } + + @Test + public void testIntMapByDynamicHashValues() { + IntMap map = new IntMapByDynamicHash(); + for (int i = 0; i < 10000; i++) { + map.put(i, i + 100); + } + IntIterator iterator = map.values(); + for (int i = 0; i < 10000; i++) { + Assert.assertTrue(iterator.hasNext()); + Assert.assertEquals(i + 100, iterator.next()); + } + Assert.assertFalse(iterator.hasNext()); + } + private IntMap fixed(int capacity) { return new IntMap.IntMapByFixedAddr(capacity); } diff --git a/hugegraph-server/hugegraph-test/src/test/java/org/apache/hugegraph/benchmark/BenchmarkConstants.java b/hugegraph-server/hugegraph-test/src/test/java/org/apache/hugegraph/benchmark/BenchmarkConstants.java index 1525e8143e..1641bc95c9 100644 --- a/hugegraph-server/hugegraph-test/src/test/java/org/apache/hugegraph/benchmark/BenchmarkConstants.java +++ b/hugegraph-server/hugegraph-test/src/test/java/org/apache/hugegraph/benchmark/BenchmarkConstants.java @@ -19,5 +19,5 @@ public class BenchmarkConstants { - public static String OUTPUT_PATH = "./hugegraph-test/target/"; + public static String OUTPUT_PATH = "./hugegraph-server/hugegraph-test/target/"; } diff --git a/hugegraph-server/hugegraph-test/src/test/java/org/apache/hugegraph/benchmark/map/MapRandomGetPutThroughputTest.java b/hugegraph-server/hugegraph-test/src/test/java/org/apache/hugegraph/benchmark/map/MapRandomGetPutThroughputTest.java index c1f04fa733..b22e83224a 100644 --- a/hugegraph-server/hugegraph-test/src/test/java/org/apache/hugegraph/benchmark/map/MapRandomGetPutThroughputTest.java +++ b/hugegraph-server/hugegraph-test/src/test/java/org/apache/hugegraph/benchmark/map/MapRandomGetPutThroughputTest.java @@ -58,11 +58,11 @@ public class MapRandomGetPutThroughputTest { private ConcurrentHashMap concurrentHashMap; - private IntMap.IntMapBySegments intMapBySegments; + private IntMap intMapBySegments; - private IntMapByDynamicHash intMapByDynamicHashNonCap; + private IntMap intMapByDynamicHashNonCap; - private IntMapByDynamicHash intMapByDynamicHash; + private IntMap intMapByDynamicHash; private static final int THREAD_COUNT = 8; @@ -103,7 +103,7 @@ public void randomGetPutOfConcurrentHashMapWithNoneInitCap(ThreadState state) { @Benchmark @Threads(THREAD_COUNT) public void randomGetPutOfConcurrentHashMapWithInitCap(ThreadState state) { - int key = state.next(); + int key = state.next() & (MAP_CAPACITY - 1); if (!this.concurrentHashMap.containsKey(key)) { this.concurrentHashMap.put(key, state.next()); } @@ -112,7 +112,7 @@ public void randomGetPutOfConcurrentHashMapWithInitCap(ThreadState state) { @Benchmark @Threads(THREAD_COUNT) - public void randomGetPutOfIntMapBySegments(ThreadState state) { + public void randomGetPutOfIntMapBySegmentsWithInitCap(ThreadState state) { int key = state.next() & (MAP_CAPACITY - 1); if (!this.intMapBySegments.containsKey(key)) { this.intMapBySegments.put(key, state.next()); @@ -132,8 +132,8 @@ public void randomGetPutOfIntMapByDynamicHashWithNoneCap(ThreadState state) { @Benchmark @Threads(THREAD_COUNT) - public void randomGetPutOfIntMapByDynamicHash(ThreadState state) { - int key = state.next(); + public void randomGetPutOfIntMapByDynamicHashWithInitCap(ThreadState state) { + int key = state.next() & (MAP_CAPACITY - 1); if (!this.intMapByDynamicHash.containsKey(key)) { this.intMapByDynamicHash.put(key, state.next()); } From b8ed01680f464f4b0eb31ac7e557e4a4f3469b17 Mon Sep 17 00:00:00 2001 From: imbajin Date: Wed, 6 Dec 2023 14:20:08 +0800 Subject: [PATCH 3/5] add some basic comment & fix some style --- .../util/collection/IntMapByDynamicHash.java | 220 ++++++++++++++---- 1 file changed, 170 insertions(+), 50 deletions(-) diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/collection/IntMapByDynamicHash.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/collection/IntMapByDynamicHash.java index b233c24225..c974717733 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/collection/IntMapByDynamicHash.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/collection/IntMapByDynamicHash.java @@ -25,6 +25,12 @@ import sun.misc.Unsafe; +/** + * This class implements a concurrent hash map specifically designed for integer keys and values. + * It uses low-level programming techniques such as direct memory access via `sun.misc.Unsafe` to + * achieve high performance. + * The class is part of the Apache HugeGraph project. + */ public class IntMapByDynamicHash implements IntMap { private static final int DEFAULT_INITIAL_CAPACITY = 16; @@ -44,8 +50,7 @@ public class IntMapByDynamicHash implements IntMap { private static final AtomicReferenceFieldUpdater TABLE_UPDATER = - AtomicReferenceFieldUpdater.newUpdater(IntMapByDynamicHash.class, Entry[].class, - "table"); + AtomicReferenceFieldUpdater.newUpdater(IntMapByDynamicHash.class, Entry[].class, "table"); private volatile Entry[] table; @@ -60,24 +65,41 @@ public class IntMapByDynamicHash implements IntMap { private static final Entry RESIZE_SENTINEL = new Entry(NULL_VALUE, NULL_VALUE, 3); /** - * must be 2^n - 1 + * must be (2^n) - 1 */ private static final int SIZE_BUCKETS = 7; - - /* ---------------- Table element access -------------- */ - private static Object tableAt(Object[] array, int index) { - return UNSAFE.getObjectVolatile(array, - ((long) index << ENTRY_ARRAY_SHIFT) + - ENTRY_ARRAY_BASE); + /** + * Constructor for the IntMapByDynamicHash class. + * + * @param initialCapacity the initial capacity of the map. + */ + public IntMapByDynamicHash(int initialCapacity) { + if (initialCapacity < 0) { + throw new IllegalArgumentException("Illegal Initial Capacity: " + initialCapacity); + } + if (initialCapacity > MAXIMUM_CAPACITY) { + initialCapacity = MAXIMUM_CAPACITY; + } + long size = (long) (1.0 + (long) initialCapacity / LOAD_FACTOR); + int cap = (size >= (long) MAXIMUM_CAPACITY) ? + MAXIMUM_CAPACITY : tableSizeFor((int) size); + if (cap >= PARTITIONED_SIZE_THRESHOLD) { + // we want 7 extra slots, and 64 bytes for each slot int are 4 bytes, + // so 64 bytes are 16 ints. + this.partitionedSize = + new int[SIZE_BUCKETS * 16]; + } + // The end index is for resizeContainer + this.table = new Entry[cap + 1]; } - private static boolean casTableAt(Object[] array, int index, Object expected, Object newValue) { - return UNSAFE.compareAndSwapObject( - array, - ((long) index << ENTRY_ARRAY_SHIFT) + ENTRY_ARRAY_BASE, - expected, - newValue); + /** + * Default constructor for the IntMapByDynamicHash class. + * Initializes the map with the default initial capacity. + */ + public IntMapByDynamicHash() { + this(DEFAULT_INITIAL_CAPACITY); } private static void setTableAt(Object[] array, int index, Object newValue) { @@ -101,30 +123,26 @@ private static int tableSizeFor(int c) { @SuppressWarnings("UnusedDeclaration") private volatile int size; - public IntMapByDynamicHash() { - this(DEFAULT_INITIAL_CAPACITY); + /* ---------------- Table element access -------------- */ + private static Object tableAt(Object[] array, int index) { + return UNSAFE.getObjectVolatile(array, + ((long) index << ENTRY_ARRAY_SHIFT) + ENTRY_ARRAY_BASE); } - public IntMapByDynamicHash(int initialCapacity) { - if (initialCapacity < 0) { - throw new IllegalArgumentException("Illegal Initial Capacity: " + initialCapacity); - } - if (initialCapacity > MAXIMUM_CAPACITY) { - initialCapacity = MAXIMUM_CAPACITY; - } - long size = (long) (1.0 + (long) initialCapacity / LOAD_FACTOR); - int cap = (size >= (long) MAXIMUM_CAPACITY) ? - MAXIMUM_CAPACITY : tableSizeFor((int) size); - if (cap >= PARTITIONED_SIZE_THRESHOLD) { - // we want 7 extra slots and 64 bytes for each - // slot. int is 4 bytes, so 64 bytes is 16 ints. - this.partitionedSize = - new int[SIZE_BUCKETS * 16]; - } - // The end index is for resizeContainer - this.table = new Entry[cap + 1]; + private static boolean casTableAt(Object[] array, int index, Object expected, Object newValue) { + return UNSAFE.compareAndSwapObject(array, + ((long) index << ENTRY_ARRAY_SHIFT) + ENTRY_ARRAY_BASE, + expected, newValue); } + /** + * Puts a key-value pair into the map. If the key already exists in the map, its value is + * updated. + * + * @param key the key to be put into the map. + * @param value the value to be associated with the key. + * @return true if the operation is successful. + */ @Override public boolean put(int key, int value) { int hash = this.hash(key); @@ -143,6 +161,16 @@ public boolean put(int key, int value) { return true; } + /** + * This method is used when the normal put operation fails due to a hash collision. + * It searches for the key in the chain and if found, replaces the entry. + * If the key is not found, it adds a new entry. + * + * @param key the key to be put into the map. + * @param value the value to be associated with the key. + * @param currentTable the current table where the key-value pair is to be put. + * @return the old value if the key is already present in the map, otherwise NULL_VALUE. + */ private int slowPut(int key, int value, Entry[] currentTable) { int length; int index; @@ -189,6 +217,13 @@ private int slowPut(int key, int value, Entry[] currentTable) { } } + /** + * Retrieves the value associated with the given key from the map. + * + * @param key the key whose associated value is to be returned. + * @return the value associated with the given key, or NULL_VALUE if the key does not exist + * in the map. + */ @Override public int get(int key) { int hash = this.hash(key); @@ -199,6 +234,7 @@ public int get(int key) { } for (Entry e = o; e != null; e = e.getNext()) { int k; + // TODO: check why key == k is always false if ((k = e.getKey()) == key || key == k) { return e.value; } @@ -206,6 +242,15 @@ public int get(int key) { return NULL_VALUE; } + /** + * This method is used when the normal get operation fails due to a hash collision. + * It searches for the key in the chain and returns the associated value if found. + * + * @param key the key whose associated value is to be returned. + * @param currentArray the current table where the key-value pair is located. + * @return the value associated with the given key, or NULL_VALUE if the key does not exist + * in the map. + */ private int slowGet(int key, Entry[] currentArray) { while (true) { int length = currentArray.length; @@ -227,6 +272,12 @@ private int slowGet(int key, Entry[] currentArray) { } } + /** + * Removes the key-value pair with the given key from the map. + * + * @param key the key whose associated key-value pair is to be removed. + * @return true if the key-value pair was found and removed, false otherwise. + */ @Override public boolean remove(int key) { int hash = this.hash(key); @@ -252,6 +303,14 @@ public boolean remove(int key) { return false; } + /** + * This method is used when the normal remove operation fails due to a hash collision. + * It searches for the key in the chain and if found, removes the entry. + * + * @param key the key whose associated key-value pair is to be removed. + * @param currentTable the current table where the key-value pair is located. + * @return the removed entry if the key is found, otherwise null. + */ private Entry slowRemove(int key, Entry[] currentTable) { int length; int index; @@ -283,13 +342,19 @@ private Entry slowRemove(int key, Entry[] currentTable) { } if (prev != null) { - // Key not found + // Key doesn't found return null; } } } } + /** + * Checks if the map contains a key-value pair with the given key. + * + * @param key the key to be checked. + * @return true if the map contains a key-value pair with the given key, false otherwise. + */ @Override public boolean containsKey(int key) { return this.getEntry(key) != null; @@ -305,6 +370,9 @@ public IntIterator values() { return new ValueIterator(); } + /** + * Removes all the mappings from this map. The map will be empty after this call returns. + */ @Override public void clear() { Entry[] currentArray = this.table; @@ -314,9 +382,9 @@ public void clear() { for (int i = 0; i < currentArray.length - 1; i++) { Entry o = (Entry) IntMapByDynamicHash.tableAt(currentArray, i); if (o == RESIZED || o == RESIZING) { - resizeContainer = (ResizeContainer) IntMapByDynamicHash.tableAt(currentArray, - currentArray.length - - 1); + resizeContainer = + (ResizeContainer) IntMapByDynamicHash.tableAt(currentArray, + currentArray.length - 1); } else if (o != null) { Entry e = o; if (IntMapByDynamicHash.casTableAt(currentArray, i, o, null)) { @@ -421,8 +489,7 @@ private void incrementLocalSize(int value) { } } - private Entry createReplacementChainForRemoval(Entry original, - Entry toRemove) { + private Entry createReplacementChainForRemoval(Entry original, Entry toRemove) { if (original == toRemove) { return original.getNext(); } @@ -465,7 +532,15 @@ private void resize(Entry[] oldTable) { this.resize(oldTable, (oldTable.length - 1 << 1) + 1); } - // newSize must be a power of 2 + 1 + /** + * Resizes the map to a new capacity. This method is called when the map's size exceeds its + * threshold. It creates a new array with the new capacity and transfers all entries from the + * old array to the new one. + * Note: newSize must be a power of 2 + 1 + * + * @param oldTable The old table to resize. + * @param newSize The new size for the table. + */ @SuppressWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER") private void resize(Entry[] oldTable, int newSize) { int oldCapacity = oldTable.length; @@ -514,7 +589,15 @@ private void resize(Entry[] oldTable, int newSize) { } /** - * Transfer all entries from src to dest tables + * Transfers all entries from the source table to the destination table. This method is + * called during the resize operation. It iterates over the source table and for each non-null + * entry, it copies the entry to the destination table. If the entry in the source table is + * marked as RESIZED or RESIZING, it helps with the resize operation. + * After all entries are transferred, it notifies the ResizeContainer that the resize operation + * is done. + * + * @param src The source table from which entries are to be transferred. + * @param resizeContainer The container that holds the state of the resize operation. */ private void transfer(Entry[] src, ResizeContainer resizeContainer) { Entry[] dest = resizeContainer.nextArray; @@ -531,6 +614,7 @@ private void transfer(Entry[] src, ResizeContainer resizeContainer) { this location to the new array. This means that the elements in the current position have already been processed and do not need to be migrated again. */ + // TODO: use -ResizeContainer.QUEUE_INCREMENT directly? j = (j & ~(ResizeContainer.QUEUE_INCREMENT - 1)) + ResizeContainer.QUEUE_INCREMENT; /* When there is only one thread for expansion, there is no concurrency issue @@ -560,8 +644,7 @@ private void transfer(Entry[] src, ResizeContainer resizeContainer) { */ private Entry[] helpWithResize(Entry[] currentArray) { ResizeContainer resizeContainer = - (ResizeContainer) IntMapByDynamicHash.tableAt(currentArray, - currentArray.length - 1); + (ResizeContainer) IntMapByDynamicHash.tableAt(currentArray, currentArray.length - 1); Entry[] newTable = resizeContainer.nextArray; if (resizeContainer.getQueuePosition() > ResizeContainer.QUEUE_INCREMENT) { resizeContainer.incrementResizer(); @@ -571,6 +654,15 @@ private Entry[] helpWithResize(Entry[] currentArray) { return newTable; } + /** + * Transfers entries from the old table to the new table in reverse order. This method is used + * to help the resize operation by spreading the work among multiple threads. Each thread + * transfers a portion of the entries from the end of the old table to the beginning of the new + * table. + * + * @param src The old table to transfer entries from. + * @param resizeContainer The container that holds the state of the resize operation. + */ private void reverseTransfer(Entry[] src, ResizeContainer resizeContainer) { Entry[] dest = resizeContainer.nextArray; while (resizeContainer.getQueuePosition() > 0) { @@ -605,6 +697,14 @@ private void reverseTransfer(Entry[] src, ResizeContainer resizeContainer) { } } + /** + * Copies an entry from the old table to the new table. This method is called during the resize + * operation. It does not check if the entry already exists in the new table, so it should only + * be called with entries that are not in the new table yet. + * + * @param dest The new table to copy the entry to. + * @param toCopyEntry The entry to copy. + */ private void unconditionalCopy(Entry[] dest, Entry toCopyEntry) { Entry[] currentArray = dest; while (true) { @@ -612,9 +712,9 @@ private void unconditionalCopy(Entry[] dest, Entry toCopyEntry) { int index = this.hash(toCopyEntry.getKey(), length); Entry o = (Entry) IntMapByDynamicHash.tableAt(currentArray, index); if (o == RESIZED || o == RESIZING) { - currentArray = ((ResizeContainer) IntMapByDynamicHash.tableAt(currentArray, - length - - 1)).nextArray; + currentArray = + ((ResizeContainer) IntMapByDynamicHash.tableAt(currentArray, + length - 1)).nextArray; } else { Entry newEntry; if (o == null) { @@ -624,8 +724,7 @@ private void unconditionalCopy(Entry[] dest, Entry toCopyEntry) { newEntry = new Entry(toCopyEntry.getKey(), toCopyEntry.getValue()); } } else { - newEntry = - new Entry(toCopyEntry.getKey(), toCopyEntry.getValue(), o); + newEntry = new Entry(toCopyEntry.getKey(), toCopyEntry.getValue(), o); } if (IntMapByDynamicHash.casTableAt(currentArray, index, o, newEntry)) { return; @@ -634,7 +733,14 @@ private void unconditionalCopy(Entry[] dest, Entry toCopyEntry) { } } + /** + * The ResizeContainer class is used to hold the state of the resize operation. + * It contains the new array to which entries are transferred, the number of threads + * participating in the resize operation, and the position in the old array from which + * entries are transferred. + */ private static final class ResizeContainer extends Entry { + private static final int QUEUE_INCREMENT = Math.min(1 << 10, Integer.highestOneBit(Runtime.getRuntime().availableProcessors()) << 4); @@ -776,6 +882,12 @@ private IteratorState(Entry[] currentTable, int start, int end) { } } + /** + * The HashIterator class is an abstract base class for iterators over the map. + * It maintains the current state of the iteration, which includes the current table + * being iterated over and the index of the next entry to be returned. + * The findNext() method is used to advance the iterator to the next entry. + */ private abstract class HashIterator implements IntIterator { private List todo; private IteratorState currentState; @@ -787,6 +899,14 @@ protected HashIterator() { this.findNext(); } + /** + * This method is used to advance the iterator to the next entry. + * It iterates over the entries in the current table from the current index + * until it finds a non-null entry. If it encounters a RESIZED or RESIZING entry, + * it helps with the resize operation and continues the iteration in the new table. + * If it reaches the end of the current table and there are still tables left to be + * iterated over, it switches to the next table. + */ private void findNext() { while (this.index < this.currentState.end) { Entry o = From 375e29d4f0a41b5b94f6d3ca9afe6c5b4fe0e845 Mon Sep 17 00:00:00 2001 From: conghuhu Date: Wed, 6 Dec 2023 14:23:33 +0800 Subject: [PATCH 4/5] feat: fix pr review --- .../util/collection/IntMapByDynamicHash.java | 39 ++++++++++--------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/collection/IntMapByDynamicHash.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/collection/IntMapByDynamicHash.java index b233c24225..a04eeaf440 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/collection/IntMapByDynamicHash.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/collection/IntMapByDynamicHash.java @@ -54,10 +54,16 @@ public class IntMapByDynamicHash implements IntMap { */ private int[] partitionedSize; - private static final Entry RESIZING = new Entry(NULL_VALUE, NULL_VALUE, 1); - private static final Entry RESIZED = new Entry(NULL_VALUE, NULL_VALUE, 2); + /** + * updated via atomic field updater + */ + @SuppressWarnings("UnusedDeclaration") + private volatile int size; + + private static final Entry RESIZING = new Entry(NULL_VALUE, NULL_VALUE, (byte) 1); + private static final Entry RESIZED = new Entry(NULL_VALUE, NULL_VALUE, (byte) 2); - private static final Entry RESIZE_SENTINEL = new Entry(NULL_VALUE, NULL_VALUE, 3); + private static final Entry RESIZE_SENTINEL = new Entry(NULL_VALUE, NULL_VALUE, (byte) 3); /** * must be 2^n - 1 @@ -72,12 +78,10 @@ private static Object tableAt(Object[] array, int index) { ENTRY_ARRAY_BASE); } - private static boolean casTableAt(Object[] array, int index, Object expected, Object newValue) { + private static boolean casTableAt(Object[] array, int idx, Object e, Object v) { return UNSAFE.compareAndSwapObject( array, - ((long) index << ENTRY_ARRAY_SHIFT) + ENTRY_ARRAY_BASE, - expected, - newValue); + ((long) idx << ENTRY_ARRAY_SHIFT) + ENTRY_ARRAY_BASE, e, v); } private static void setTableAt(Object[] array, int index, Object newValue) { @@ -95,12 +99,6 @@ private static int tableSizeFor(int c) { return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; } - /** - * updated via atomic field updater - */ - @SuppressWarnings("UnusedDeclaration") - private volatile int size; - public IntMapByDynamicHash() { this(DEFAULT_INITIAL_CAPACITY); } @@ -637,13 +635,13 @@ private void unconditionalCopy(Entry[] dest, Entry toCopyEntry) { private static final class ResizeContainer extends Entry { private static final int QUEUE_INCREMENT = Math.min(1 << 10, - Integer.highestOneBit(Runtime.getRuntime().availableProcessors()) << 4); + Integer.highestOneBit(IntSet.CPUS) << 4); private final AtomicInteger resizers = new AtomicInteger(1); private final Entry[] nextArray; private final AtomicInteger queuePosition; private ResizeContainer(Entry[] nextArray, int oldSize) { - super(NULL_VALUE, NULL_VALUE, 4); + super(NULL_VALUE, NULL_VALUE, (byte) 4); this.nextArray = nextArray; this.queuePosition = new AtomicInteger(oldSize); } @@ -689,7 +687,7 @@ public void waitForAllResizers() { try { this.wait(); } catch (InterruptedException e) { - //ginore + // ignore } } } @@ -706,6 +704,7 @@ public void zeroOutQueuePosition() { } private static class Entry { + final int key; volatile int value; volatile Entry next; @@ -715,11 +714,11 @@ private static class Entry { * 1 RESIZING * 2 RESIZED * 3 RESIZE_SENTINEL - * 4 ResizeContainer + * 4 RESIZE_CONTAINER */ - final int state; + final byte state; - public Entry(int key, int value, int state) { + public Entry(int key, int value, byte state) { this.key = key; this.value = value; this.state = state; @@ -760,6 +759,7 @@ public String toString() { /* ---------------- Iterator -------------- */ private static final class IteratorState { + private Entry[] currentTable; private int start; private int end; @@ -777,6 +777,7 @@ private IteratorState(Entry[] currentTable, int start, int end) { } private abstract class HashIterator implements IntIterator { + private List todo; private IteratorState currentState; private Entry next; From 326de034f01a44916e715839e29cb314898f9233 Mon Sep 17 00:00:00 2001 From: conghuhu Date: Wed, 6 Dec 2023 19:44:47 +0800 Subject: [PATCH 5/5] fix: fix some review --- .../util/collection/IntMapByDynamicHash.java | 12 +++-- .../map/MapRandomGetPutThroughputTest.java | 50 +++++++++---------- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/collection/IntMapByDynamicHash.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/collection/IntMapByDynamicHash.java index 429a8f75eb..a86def7f13 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/collection/IntMapByDynamicHash.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/collection/IntMapByDynamicHash.java @@ -124,15 +124,17 @@ private static int tableSizeFor(int c) { } /* ---------------- Table element access -------------- */ + + private static long entryOffset(int index) { + return ((long) index << ENTRY_ARRAY_SHIFT) + ENTRY_ARRAY_BASE; + } + private static Object tableAt(Object[] array, int index) { - return UNSAFE.getObjectVolatile(array, - ((long) index << ENTRY_ARRAY_SHIFT) + ENTRY_ARRAY_BASE); + return UNSAFE.getObjectVolatile(array, entryOffset(index)); } private static boolean casTableAt(Object[] array, int index, Object expected, Object newValue) { - return UNSAFE.compareAndSwapObject(array, - ((long) index << ENTRY_ARRAY_SHIFT) + ENTRY_ARRAY_BASE, - expected, newValue); + return UNSAFE.compareAndSwapObject(array, entryOffset(index), expected, newValue); } /** diff --git a/hugegraph-server/hugegraph-test/src/test/java/org/apache/hugegraph/benchmark/map/MapRandomGetPutThroughputTest.java b/hugegraph-server/hugegraph-test/src/test/java/org/apache/hugegraph/benchmark/map/MapRandomGetPutThroughputTest.java index b22e83224a..4423351b52 100644 --- a/hugegraph-server/hugegraph-test/src/test/java/org/apache/hugegraph/benchmark/map/MapRandomGetPutThroughputTest.java +++ b/hugegraph-server/hugegraph-test/src/test/java/org/apache/hugegraph/benchmark/map/MapRandomGetPutThroughputTest.java @@ -54,15 +54,15 @@ public class MapRandomGetPutThroughputTest { @Param(value = {"1000", "10000", "100000", "1000000"}) private int MAP_CAPACITY; - private ConcurrentHashMap concurrentHashMapNonCap; + private ConcurrentHashMap concurrentHashMapWithoutCap; - private ConcurrentHashMap concurrentHashMap; + private ConcurrentHashMap concurrentHashMapWithCap; - private IntMap intMapBySegments; + private IntMap intMapBySegmentsWithCap; - private IntMap intMapByDynamicHashNonCap; + private IntMap intMapByDynamicHashWithoutCap; - private IntMap intMapByDynamicHash; + private IntMap intMapByDynamicHashWithCap; private static final int THREAD_COUNT = 8; @@ -70,11 +70,11 @@ public class MapRandomGetPutThroughputTest { @Setup(Level.Trial) public void prepareMap() { - this.concurrentHashMapNonCap = new ConcurrentHashMap<>(); - this.concurrentHashMap = new ConcurrentHashMap<>(MAP_CAPACITY); - this.intMapBySegments = new IntMap.IntMapBySegments(MAP_CAPACITY); - this.intMapByDynamicHashNonCap = new IntMapByDynamicHash(); - this.intMapByDynamicHash = new IntMapByDynamicHash(MAP_CAPACITY); + this.concurrentHashMapWithoutCap = new ConcurrentHashMap<>(); + this.concurrentHashMapWithCap = new ConcurrentHashMap<>(MAP_CAPACITY); + this.intMapBySegmentsWithCap = new IntMap.IntMapBySegments(MAP_CAPACITY); + this.intMapByDynamicHashWithoutCap = new IntMapByDynamicHash(); + this.intMapByDynamicHashWithCap = new IntMapByDynamicHash(MAP_CAPACITY); } /** @@ -94,50 +94,50 @@ int next() { @Threads(THREAD_COUNT) public void randomGetPutOfConcurrentHashMapWithNoneInitCap(ThreadState state) { int key = state.next(); - if (!this.concurrentHashMapNonCap.containsKey(key)) { - this.concurrentHashMapNonCap.put(key, state.next()); + if (!this.concurrentHashMapWithoutCap.containsKey(key)) { + this.concurrentHashMapWithoutCap.put(key, state.next()); } - this.concurrentHashMapNonCap.get(key); + this.concurrentHashMapWithoutCap.get(key); } @Benchmark @Threads(THREAD_COUNT) public void randomGetPutOfConcurrentHashMapWithInitCap(ThreadState state) { int key = state.next() & (MAP_CAPACITY - 1); - if (!this.concurrentHashMap.containsKey(key)) { - this.concurrentHashMap.put(key, state.next()); + if (!this.concurrentHashMapWithCap.containsKey(key)) { + this.concurrentHashMapWithCap.put(key, state.next()); } - this.concurrentHashMap.get(key); + this.concurrentHashMapWithCap.get(key); } @Benchmark @Threads(THREAD_COUNT) public void randomGetPutOfIntMapBySegmentsWithInitCap(ThreadState state) { int key = state.next() & (MAP_CAPACITY - 1); - if (!this.intMapBySegments.containsKey(key)) { - this.intMapBySegments.put(key, state.next()); + if (!this.intMapBySegmentsWithCap.containsKey(key)) { + this.intMapBySegmentsWithCap.put(key, state.next()); } - this.intMapBySegments.get(key); + this.intMapBySegmentsWithCap.get(key); } @Benchmark @Threads(THREAD_COUNT) public void randomGetPutOfIntMapByDynamicHashWithNoneCap(ThreadState state) { int key = state.next(); - if (!this.intMapByDynamicHashNonCap.containsKey(key)) { - this.intMapByDynamicHashNonCap.put(key, state.next()); + if (!this.intMapByDynamicHashWithoutCap.containsKey(key)) { + this.intMapByDynamicHashWithoutCap.put(key, state.next()); } - this.intMapByDynamicHashNonCap.get(key); + this.intMapByDynamicHashWithoutCap.get(key); } @Benchmark @Threads(THREAD_COUNT) public void randomGetPutOfIntMapByDynamicHashWithInitCap(ThreadState state) { int key = state.next() & (MAP_CAPACITY - 1); - if (!this.intMapByDynamicHash.containsKey(key)) { - this.intMapByDynamicHash.put(key, state.next()); + if (!this.intMapByDynamicHashWithCap.containsKey(key)) { + this.intMapByDynamicHashWithCap.put(key, state.next()); } - this.intMapByDynamicHash.get(key); + this.intMapByDynamicHashWithCap.get(key); } public static void main(String[] args) throws RunnerException {