diff --git a/src/main/java/org/redkale/net/AsyncIOThread.java b/src/main/java/org/redkale/net/AsyncIOThread.java index f9c246551..97ca10ba4 100644 --- a/src/main/java/org/redkale/net/AsyncIOThread.java +++ b/src/main/java/org/redkale/net/AsyncIOThread.java @@ -35,9 +35,9 @@ public class AsyncIOThread extends WorkThread { private final Consumer bufferConsumer; - private final Queue commandQueue = Utility.unsafe() != null ? new MpscGrowableArrayQueue<>(16, 1 << 16) : new ConcurrentLinkedQueue<>(); + private final Queue commandQueue = new ConcurrentLinkedQueue<>(); - private final Queue> registerQueue = Utility.unsafe() != null ? new MpscGrowableArrayQueue<>(16, 1 << 16) : new ConcurrentLinkedQueue<>(); + private final Queue> registerQueue = new ConcurrentLinkedQueue<>(); private final AtomicBoolean closed = new AtomicBoolean(); diff --git a/src/main/java/org/redkale/util/MpscChunkedArrayQueue.java b/src/main/java/org/redkale/util/MpscChunkedArrayQueue.java deleted file mode 100644 index 574f0e372..000000000 --- a/src/main/java/org/redkale/util/MpscChunkedArrayQueue.java +++ /dev/null @@ -1,731 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.util; - -import java.util.*; -import java.util.function.Supplier; - -/** - * 参考 https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscChunkedArrayQueue.java version: v3.3.0实现的MPSC队列 - * - *

- * 详情见: https://redkale.org - * - * @param 泛型 - * - * @author zhangjx - * @since 2.5.0 - */ -public class MpscChunkedArrayQueue extends AbstractQueue { - - byte b000, b001, b002, b003, b004, b005, b006, b007;// 8b - - byte b010, b011, b012, b013, b014, b015, b016, b017;// 16b - - byte b020, b021, b022, b023, b024, b025, b026, b027;// 24b - - byte b030, b031, b032, b033, b034, b035, b036, b037;// 32b - - byte b040, b041, b042, b043, b044, b045, b046, b047;// 40b - - byte b050, b051, b052, b053, b054, b055, b056, b057;// 48b - - byte b060, b061, b062, b063, b064, b065, b066, b067;// 56b - - byte b070, b071, b072, b073, b074, b075, b076, b077;// 64b - - byte b100, b101, b102, b103, b104, b105, b106, b107;// 72b - - byte b110, b111, b112, b113, b114, b115, b116, b117;// 80b - - byte b120, b121, b122, b123, b124, b125, b126, b127;// 88b - - byte b130, b131, b132, b133, b134, b135, b136, b137;// 96b - - byte b140, b141, b142, b143, b144, b145, b146, b147;//104b - - byte b150, b151, b152, b153, b154, b155, b156, b157;//112b - - byte b160, b161, b162, b163, b164, b165, b166, b167;//120b - - byte b170, b171, b172, b173, b174, b175, b176, b177;//128b - - //---------------------------------------------- - private static final Unsafe UNSAFE = Utility.unsafe(); - - private static final long REF_ARRAY_BASE; - - private static final int REF_ELEMENT_SHIFT; - - static { - final int scale = UNSAFE == null ? 4 : UNSAFE.arrayIndexScale(Object[].class); - if (4 == scale) { - REF_ELEMENT_SHIFT = 2; - } else if (8 == scale) { - REF_ELEMENT_SHIFT = 3; - } else { - throw new IllegalStateException("Unknown pointer size: " + scale); - } - REF_ARRAY_BASE = UNSAFE == null ? 0L : UNSAFE.arrayBaseOffset(Object[].class); - } - - // No post padding here, subclasses must add - private static final Object JUMP = new Object(); - - private static final Object BUFFER_CONSUMED = new Object(); - - private static final int CONTINUE_TO_P_INDEX_CAS = 0; - - private static final int RETRY = 1; - - private static final int QUEUE_FULL = 2; - - private static final int QUEUE_RESIZE = 3; - - protected final long maxQueueCapacity; - - private final static long P_LIMIT_OFFSET = fieldOffset(MpscChunkedArrayQueue.class, "producerLimit"); - - private volatile long producerLimit; - - protected long producerMask; - - protected E[] producerBuffer; - - private final static long C_INDEX_OFFSET = fieldOffset(MpscChunkedArrayQueue.class, "consumerIndex"); - - private volatile long consumerIndex; - - protected long consumerMask; - - protected E[] consumerBuffer; - - private final static long P_INDEX_OFFSET = fieldOffset(MpscChunkedArrayQueue.class, "producerIndex"); - - private volatile long producerIndex; - - public MpscChunkedArrayQueue(int maxCapacity) { - this(Math.max(2, Math.min(1024, Utility.roundToPowerOfTwo(maxCapacity / 8))), maxCapacity); - } - - /** - * @param initialCapacity the queue initial capacity. If chunk size is fixed this will be the chunk size. - * Must be 2 or more. - * @param maxCapacity the maximum capacity will be rounded up to the closest power of 2 and will be the - * upper limit of number of elements in this queue. Must be 4 or more and round up to a larger - * power of 2 than initialCapacity. - */ - public MpscChunkedArrayQueue(int initialCapacity, int maxCapacity) { - if (initialCapacity < 2) throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expected: >= 2)"); - int p2capacity = Utility.roundToPowerOfTwo(initialCapacity); - // leave lower bit of mask clear - long mask = (p2capacity - 1) << 1; - // need extra element to point at next array - E[] buffer = (E[]) new Object[p2capacity + 1]; - producerBuffer = buffer; - producerMask = mask; - consumerBuffer = buffer; - consumerMask = mask; - soProducerLimit(mask); // we know it's all empty to start with - - if (maxCapacity < 4) throw new IllegalArgumentException("maxCapacity: " + maxCapacity + " (expected: >= 4)"); - int p2max = Utility.roundToPowerOfTwo(maxCapacity); - if (Utility.roundToPowerOfTwo(initialCapacity) > p2max) { - throw new IllegalArgumentException("initialCapacity: " + Utility.roundToPowerOfTwo(initialCapacity) + " (expected: <= " + p2max + ")"); - } - maxQueueCapacity = ((long) Utility.roundToPowerOfTwo(maxCapacity)) << 1; - } - - static long fieldOffset(Class clz, String fieldName) throws RuntimeException { - if (UNSAFE == null) return 0L; - try { - return UNSAFE.objectFieldOffset(clz.getDeclaredField(fieldName)); - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } - } - - static long modifiedCalcCircularRefElementOffset(long index, long mask) { - return REF_ARRAY_BASE + ((index & mask) << (REF_ELEMENT_SHIFT - 1)); - } - - static void spRefElement(E[] buffer, long offset, E e) { - UNSAFE.putObject(buffer, offset, e); - } - - @SuppressWarnings("unchecked") - static E lpRefElement(E[] buffer, long offset) { - return (E) UNSAFE.getObject(buffer, offset); - } - - /** - * A volatile load of an element from a given offset. - * - * @param buffer this.buffer - * @param offset computed via - * - * @return the element at the offset - */ - @SuppressWarnings("unchecked") - static E lvRefElement(E[] buffer, long offset) { - return (E) UNSAFE.getObjectVolatile(buffer, offset); - } - - /** - * An ordered store of an element to a given offset - * - * @param E - * @param buffer this.buffer - * @param offset computed via - * @param e an orderly kitty - */ - public static void soRefElement(E[] buffer, long offset, E e) { - UNSAFE.putOrderedObject(buffer, offset, e); - } - - public static long calcRefElementOffset(long index) { - return REF_ARRAY_BASE + (index << REF_ELEMENT_SHIFT); - } - - /** - * Note: circular arrays are assumed a power of 2 in length and the `mask` is (length - 1). - * - * @param index desirable element index - * @param mask (length - 1) - * - * @return the offset in bytes within the circular array for a given index - */ - public static long calcCircularRefElementOffset(long index, long mask) { - return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT); - } - - public final long lvProducerIndex() { - return producerIndex; - } - - final void soProducerIndex(long newValue) { - UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, newValue); - } - - final boolean casProducerIndex(long expect, long newValue) { - return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue); - } - - public final long lvConsumerIndex() { - return consumerIndex; - } - - final long lpConsumerIndex() { - return UNSAFE.getLong(this, C_INDEX_OFFSET); - } - - final void soConsumerIndex(long newValue) { - UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue); - } - - final long lvProducerLimit() { - return producerLimit; - } - - final boolean casProducerLimit(long expect, long newValue) { - return UNSAFE.compareAndSwapLong(this, P_LIMIT_OFFSET, expect, newValue); - } - - final void soProducerLimit(long newValue) { - UNSAFE.putOrderedLong(this, P_LIMIT_OFFSET, newValue); - } - - @Override - public int size() { - // NOTE: because indices are on even numbers we cannot use the size util. - - /* - * It is possible for a thread to be interrupted or reschedule between the read of the producer and - * consumer indices, therefore protection is required to ensure size is within valid range. In the - * event of concurrent polls/offers to this method the size is OVER estimated as we read consumer - * index BEFORE the producer index. - */ - long after = lvConsumerIndex(); - long size; - while (true) { - final long before = after; - final long currentProducerIndex = lvProducerIndex(); - after = lvConsumerIndex(); - if (before == after) { - size = ((currentProducerIndex - after) >> 1); - break; - } - } - // Long overflow is impossible, so size is always positive. Integer overflow is possible for the unbounded - // indexed queues. - if (size > Integer.MAX_VALUE) { - return Integer.MAX_VALUE; - } else { - return (int) size; - } - } - - @Override - public boolean isEmpty() { - // Order matters! - // Loading consumer before producer allows for producer increments after consumer index is read. - // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is - // nothing we can do to make this an exact method. - return (this.lvConsumerIndex() == this.lvProducerIndex()); - } - - @Override - public String toString() { - return this.getClass().getName(); - } - - @Override - public boolean offer(final E e) { - if (null == e) throw new NullPointerException(); - - long mask; - E[] buffer; - long pIndex; - - while (true) { - long producerLimit0 = lvProducerLimit(); - pIndex = lvProducerIndex(); - // lower bit is indicative of resize, if we see it we spin until it's cleared - if ((pIndex & 1) == 1) { - continue; - } - // pIndex is even (lower bit is 0) -> actual index is (pIndex >> 1) - - // mask/buffer may get changed by resizing -> only use for array access after successful CAS. - mask = this.producerMask; - buffer = this.producerBuffer; - // a successful CAS ties the ordering, lv(pIndex) - [mask/buffer] -> cas(pIndex) - - // assumption behind this optimization is that queue is almost always empty or near empty - if (producerLimit0 <= pIndex) { - int result = offerSlowPath(mask, pIndex, producerLimit0); - switch (result) { - case CONTINUE_TO_P_INDEX_CAS: - break; - case RETRY: - continue; - case QUEUE_FULL: - return false; - case QUEUE_RESIZE: - resize(mask, buffer, pIndex, e, null); - return true; - } - } - - if (casProducerIndex(pIndex, pIndex + 2)) { - break; - } - } - // INDEX visible before ELEMENT - final long offset = modifiedCalcCircularRefElementOffset(pIndex, mask); - soRefElement(buffer, offset, e); // release element e - return true; - } - - /** - * {@inheritDoc} - *

- * This implementation is correct for single consumer thread use only. - */ - @SuppressWarnings("unchecked") - @Override - public E poll() { - final E[] buffer = consumerBuffer; - final long index = lpConsumerIndex(); - final long mask = consumerMask; - - final long offset = modifiedCalcCircularRefElementOffset(index, mask); - Object e = lvRefElement(buffer, offset); - if (e == null) { - if (index != lvProducerIndex()) { - // poll() == null iff queue is empty, null element is not strong enough indicator, so we must - // check the producer index. If the queue is indeed not empty we spin until element is - // visible. - do { - e = lvRefElement(buffer, offset); - } while (e == null); - } else { - return null; - } - } - - if (e == JUMP) { - final E[] nextBuffer = nextBuffer(buffer, mask); - return newBufferPoll(nextBuffer, index); - } - - soRefElement(buffer, offset, null); // release element null - soConsumerIndex(index + 2); // release cIndex - return (E) e; - } - - /** - * {@inheritDoc} - *

- * This implementation is correct for single consumer thread use only. - */ - @SuppressWarnings("unchecked") - @Override - public E peek() { - final E[] buffer = consumerBuffer; - final long index = lpConsumerIndex(); - final long mask = consumerMask; - - final long offset = modifiedCalcCircularRefElementOffset(index, mask); - Object e = lvRefElement(buffer, offset); - if (e == null && index != lvProducerIndex()) { - // peek() == null iff queue is empty, null element is not strong enough indicator, so we must - // check the producer index. If the queue is indeed not empty we spin until element is visible. - do { - e = lvRefElement(buffer, offset); - } while (e == null); - } - if (e == JUMP) { - return newBufferPeek(nextBuffer(buffer, mask), index); - } - return (E) e; - } - - /** - * We do not inline resize into this method because we do not resize on fill. - */ - private int offerSlowPath(long mask, long pIndex, long producerLimit) { - final long cIndex = lvConsumerIndex(); - long bufferCapacity = getCurrentBufferCapacity(mask); - - if (cIndex + bufferCapacity > pIndex) { - if (!casProducerLimit(producerLimit, cIndex + bufferCapacity)) { - // retry from top - return RETRY; - } else { - // continue to pIndex CAS - return CONTINUE_TO_P_INDEX_CAS; - } - } // full and cannot grow - else if (availableInQueue(pIndex, cIndex) <= 0) { - // offer should return false; - return QUEUE_FULL; - } // grab index for resize -> set lower bit - else if (casProducerIndex(pIndex, pIndex + 1)) { - // trigger a resize - return QUEUE_RESIZE; - } else { - // failed resize attempt, retry from top - return RETRY; - } - } - - @SuppressWarnings("unchecked") - private E[] nextBuffer(final E[] buffer, final long mask) { - final long offset = nextArrayOffset(mask); - final E[] nextBuffer = (E[]) lvRefElement(buffer, offset); - consumerBuffer = nextBuffer; - consumerMask = (nextBuffer.length - 2) << 1; - soRefElement(buffer, offset, BUFFER_CONSUMED); - return nextBuffer; - } - - private static long nextArrayOffset(long mask) { - return modifiedCalcCircularRefElementOffset(mask + 2, Long.MAX_VALUE); - } - - private E newBufferPoll(E[] nextBuffer, long index) { - final long offset = modifiedCalcCircularRefElementOffset(index, consumerMask); - final E n = lvRefElement(nextBuffer, offset); - if (n == null) { - throw new IllegalStateException("new buffer must have at least one element"); - } - soRefElement(nextBuffer, offset, null); - soConsumerIndex(index + 2); - return n; - } - - private E newBufferPeek(E[] nextBuffer, long index) { - final long offset = modifiedCalcCircularRefElementOffset(index, consumerMask); - final E n = lvRefElement(nextBuffer, offset); - if (null == n) { - throw new IllegalStateException("new buffer must have at least one element"); - } - return n; - } - - public long currentProducerIndex() { - return lvProducerIndex() / 2; - } - - public long currentConsumerIndex() { - return lvConsumerIndex() / 2; - } - - public boolean relaxedOffer(E e) { - return offer(e); - } - - @SuppressWarnings("unchecked") - public E relaxedPoll() { - final E[] buffer = consumerBuffer; - final long index = lpConsumerIndex(); - final long mask = consumerMask; - - final long offset = modifiedCalcCircularRefElementOffset(index, mask); - Object e = lvRefElement(buffer, offset); - if (e == null) { - return null; - } - if (e == JUMP) { - final E[] nextBuffer = nextBuffer(buffer, mask); - return newBufferPoll(nextBuffer, index); - } - soRefElement(buffer, offset, null); - soConsumerIndex(index + 2); - return (E) e; - } - - @SuppressWarnings("unchecked") - public E relaxedPeek() { - final E[] buffer = consumerBuffer; - final long index = lpConsumerIndex(); - final long mask = consumerMask; - - final long offset = modifiedCalcCircularRefElementOffset(index, mask); - Object e = lvRefElement(buffer, offset); - if (e == JUMP) { - return newBufferPeek(nextBuffer(buffer, mask), index); - } - return (E) e; - } - - public int fill(Supplier s) { - long result = 0;// result is a long because we want to have a safepoint check at regular intervals - final int capacity = capacity(); - do { - final int filled = fill(s, Utility.cpus() * 4); - if (filled == 0) { - return (int) result; - } - result += filled; - } while (result <= capacity); - return (int) result; - } - - public int fill(Supplier s, int limit) { - if (null == s) - throw new IllegalArgumentException("supplier is null"); - if (limit < 0) - throw new IllegalArgumentException("limit is negative:" + limit); - if (limit == 0) - return 0; - - long mask; - E[] buffer; - long pIndex; - int claimedSlots; - while (true) { - long producerLimit0 = lvProducerLimit(); - pIndex = lvProducerIndex(); - // lower bit is indicative of resize, if we see it we spin until it's cleared - if ((pIndex & 1) == 1) { - continue; - } - // pIndex is even (lower bit is 0) -> actual index is (pIndex >> 1) - - // NOTE: mask/buffer may get changed by resizing -> only use for array access after successful CAS. - // Only by virtue offloading them between the lvProducerIndex and a successful casProducerIndex are they - // safe to use. - mask = this.producerMask; - buffer = this.producerBuffer; - // a successful CAS ties the ordering, lv(pIndex) -> [mask/buffer] -> cas(pIndex) - - // we want 'limit' slots, but will settle for whatever is visible to 'producerLimit' - long batchIndex = Math.min(producerLimit0, pIndex + 2l * limit); // -> producerLimit >= batchIndex - - if (pIndex >= producerLimit0) { - int result = offerSlowPath(mask, pIndex, producerLimit0); - switch (result) { - case CONTINUE_TO_P_INDEX_CAS: - // offer slow path verifies only one slot ahead, we cannot rely on indication here - case RETRY: - continue; - case QUEUE_FULL: - return 0; - case QUEUE_RESIZE: - resize(mask, buffer, pIndex, null, s); - return 1; - } - } - - // claim limit slots at once - if (casProducerIndex(pIndex, batchIndex)) { - claimedSlots = (int) ((batchIndex - pIndex) / 2); - break; - } - } - - for (int i = 0; i < claimedSlots; i++) { - final long offset = modifiedCalcCircularRefElementOffset(pIndex + 2l * i, mask); - soRefElement(buffer, offset, s.get()); - } - return claimedSlots; - } - - /** - * Get an iterator for this queue. This method is thread safe. - *

- * The iterator provides a best-effort snapshot of the elements in the queue. - * The returned iterator is not guaranteed to return elements in queue order, - * and races with the consumer thread may cause gaps in the sequence of returned elements. - * Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements. - * - * @return The iterator. - */ - @Override - public Iterator iterator() { - return new WeakIterator(consumerBuffer, lvConsumerIndex(), lvProducerIndex()); - } - - private static class WeakIterator implements Iterator { - - private final long pIndex; - - private long nextIndex; - - private E nextElement; - - private E[] currentBuffer; - - private int mask; - - WeakIterator(E[] currentBuffer, long cIndex, long pIndex) { - this.pIndex = pIndex >> 1; - this.nextIndex = cIndex >> 1; - setBuffer(currentBuffer); - nextElement = getNext(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException("remove"); - } - - @Override - public boolean hasNext() { - return nextElement != null; - } - - @Override - public E next() { - final E e = nextElement; - if (e == null) { - throw new NoSuchElementException(); - } - nextElement = getNext(); - return e; - } - - private void setBuffer(E[] buffer) { - this.currentBuffer = buffer; - this.mask = buffer.length - 2; - } - - private E getNext() { - while (nextIndex < pIndex) { - long index = nextIndex++; - E e = lvRefElement(currentBuffer, calcCircularRefElementOffset(index, mask)); - // skip removed/not yet visible elements - if (e == null) { - continue; - } - - // not null && not JUMP -> found next element - if (e != JUMP) { - return e; - } - - // need to jump to the next buffer - int nextBufferIndex = mask + 1; - Object nextBuffer = lvRefElement(currentBuffer, - calcRefElementOffset(nextBufferIndex)); - - if (nextBuffer == BUFFER_CONSUMED || nextBuffer == null) { - // Consumer may have passed us, or the next buffer is not visible yet: drop out early - return null; - } - - setBuffer((E[]) nextBuffer); - // now with the new array retry the load, it can't be a JUMP, but we need to repeat same index - e = lvRefElement(currentBuffer, calcCircularRefElementOffset(index, mask)); - // skip removed/not yet visible elements - if (e == null) { - continue; - } else { - return e; - } - - } - return null; - } - } - - private void resize(long oldMask, E[] oldBuffer, long pIndex, E e, Supplier s) { - assert (e != null && s == null) || (e == null || s != null); - int newBufferLength = getNextBufferSize(oldBuffer); - final E[] newBuffer; - try { - newBuffer = (E[]) new Object[newBufferLength]; - } catch (OutOfMemoryError oom) { - assert lvProducerIndex() == pIndex + 1; - soProducerIndex(pIndex); - throw oom; - } - - producerBuffer = newBuffer; - final int newMask = (newBufferLength - 2) << 1; - producerMask = newMask; - - final long offsetInOld = modifiedCalcCircularRefElementOffset(pIndex, oldMask); - final long offsetInNew = modifiedCalcCircularRefElementOffset(pIndex, newMask); - - soRefElement(newBuffer, offsetInNew, e == null ? s.get() : e);// element in new array - soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);// buffer linked - - // ASSERT code - final long cIndex = lvConsumerIndex(); - final long availableInQueue = availableInQueue(pIndex, cIndex); - if (availableInQueue < 0) throw new IllegalArgumentException("availableInQueue: " + availableInQueue + " (expected: > 0)"); - - // Invalidate racing CASs - // We never set the limit beyond the bounds of a buffer - soProducerLimit(pIndex + Math.min(newMask, availableInQueue)); - - // make resize visible to the other producers - soProducerIndex(pIndex + 2); - - // INDEX visible before ELEMENT, consistent with consumer expectation - // make resize visible to consumer - soRefElement(oldBuffer, offsetInOld, JUMP); - } - - protected long availableInQueue(long pIndex, long cIndex) { - return maxQueueCapacity - (pIndex - cIndex); - } - - public int capacity() { - return (int) (maxQueueCapacity / 2); - } - - protected int getNextBufferSize(E[] buffer) { - return buffer.length; - } - - protected long getCurrentBufferCapacity(long mask) { - return mask; - } -} diff --git a/src/main/java/org/redkale/util/MpscGrowableArrayQueue.java b/src/main/java/org/redkale/util/MpscGrowableArrayQueue.java deleted file mode 100644 index 53fdb7fea..000000000 --- a/src/main/java/org/redkale/util/MpscGrowableArrayQueue.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.util; - -/** - * 参考 https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscGrowableArrayQueue.java version: v3.3.0实现的MPSC队列
- * 与基类的区别在于: 每次都会将连接块容量加倍,直到底层的数组可以完全容纳所有的元素。 - *

- * 详情见: https://redkale.org - * - * @param 泛型 - * - * @author zhangjx - * @since 2.5.0 - */ -public class MpscGrowableArrayQueue extends MpscChunkedArrayQueue { - - public MpscGrowableArrayQueue(int maxCapacity) { - super(Math.max(2, Utility.roundToPowerOfTwo(maxCapacity / 8)), maxCapacity); - } - - public MpscGrowableArrayQueue(int initialCapacity, int maxCapacity) { - super(initialCapacity, maxCapacity); - } - - @Override - protected int getNextBufferSize(E[] buffer) { - final long maxSize = maxQueueCapacity / 2; - int len = buffer.length; - //checkLessThanOrEqual - if (len > maxSize) throw new IllegalArgumentException("buffer.length: " + len + " (expected: <= " + maxSize + ")"); - final int newSize = 2 * (len - 1); - return newSize + 1; - } - - @Override - protected long getCurrentBufferCapacity(long mask) { - return (mask + 2 == maxQueueCapacity) ? maxQueueCapacity : mask; - } -} diff --git a/src/main/java/org/redkale/util/NonBlockingHashMap.java b/src/main/java/org/redkale/util/NonBlockingHashMap.java deleted file mode 100644 index 9a00cbf5a..000000000 --- a/src/main/java/org/redkale/util/NonBlockingHashMap.java +++ /dev/null @@ -1,1928 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.util; - -import java.io.*; -import java.util.*; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.*; - -/** - * 参考 https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/maps/NonBlockingHashMap.java version: v3.3.0实现的MPSC队列 - * - *

- * 详情见: https://redkale.org - * - * @param 泛型 - * @param 泛型 - * - * @author zhangjx - * @since 2.5.0 - */ -public class NonBlockingHashMap extends AbstractMap implements ConcurrentMap, Cloneable, Serializable { - - private static final long serialVersionUID = 1234123412341234123L; - - private static final int REPROBE_LIMIT = 10; // Too many reprobes then force a table-resize - - private static final Unsafe UNSAFE = Utility.unsafe(); - // --- Bits to allow Unsafe access to arrays - - private static final int _Obase = UNSAFE == null ? 0 : UNSAFE.arrayBaseOffset(Object[].class); - - private static final int _Oscale = UNSAFE == null ? 0 : UNSAFE.arrayIndexScale(Object[].class); - - private static final int _Olog = _Oscale == 4 ? 2 : (_Oscale == 8 ? 3 : 9999); - - private static long rawIndex(final Object[] ary, final int idx) { - assert idx >= 0 && idx < ary.length; - // Note the long-math requirement, to handle arrays of more than 2^31 bytes - // - or 2^28 - or about 268M - 8-byte pointer elements. - return _Obase + ((long) idx << _Olog); - } - - // --- Setup to use Unsafe - private static final long _kvs_offset = fieldOffset(NonBlockingHashMap.class, "_kvs"); - - static long fieldOffset(Class clz, String fieldName) throws RuntimeException { - if (UNSAFE == null) return 0L; - try { - return UNSAFE.objectFieldOffset(clz.getDeclaredField(fieldName)); - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } - } - - static void spRefElement(E[] buffer, long offset, E e) { - UNSAFE.putObject(buffer, offset, e); - } - - private boolean CAS_kvs(final Object[] oldkvs, final Object[] newkvs) { - return UNSAFE.compareAndSwapObject(this, _kvs_offset, oldkvs, newkvs); - } - - // --- Adding a 'prime' bit onto Values via wrapping with a junk wrapper class - private static final class Prime { - - final Object _V; - - Prime(Object V) { - _V = V; - } - - static Object unbox(Object V) { - return V instanceof Prime ? ((Prime) V)._V : V; - } - } - - // --- hash ---------------------------------------------------------------- - // Helper function to spread lousy hashCodes. Throws NPE for null Key, on - // purpose - as the first place to conveniently toss the required NPE for a - // null Key. - private static int hash(final Object key) { - int h = key.hashCode(); // The real hashCode call - h ^= (h >>> 20) ^ (h >>> 12); - h ^= (h >>> 7) ^ (h >>> 4); - h += h << 7; // smear low bits up high, for hashcodes that only differ by 1 - return h; - } - - // --- The Hash Table -------------------- - // Slot 0 is always used for a 'CHM' entry below to hold the interesting - // bits of the hash table. Slot 1 holds full hashes as an array of ints. - // Slots {2,3}, {4,5}, etc hold {Key,Value} pairs. The entire hash table - // can be atomically replaced by CASing the _kvs field. - // - // Why is CHM buried inside the _kvs Object array, instead of the other way - // around? The CHM info is used during resize events and updates, but not - // during standard 'get' operations. I assume 'get' is much more frequent - // than 'put'. 'get' can skip the extra indirection of skipping through the - // CHM to reach the _kvs array. - private transient Object[] _kvs; - - private static final CHM chm(Object[] kvs) { - return (CHM) kvs[0]; - } - - private static final int[] hashes(Object[] kvs) { - return (int[]) kvs[1]; - } - // Number of K,V pairs in the table - - private static final int len(Object[] kvs) { - return (kvs.length - 2) >> 1; - } - - // Time since last resize - private transient long _last_resize_milli; - - // --- Minimum table size ---------------- - // Pick size 8 K/V pairs, which turns into (8*2+2)*4+12 = 84 bytes on a - // standard 32-bit HotSpot, and (8*2+2)*8+12 = 156 bytes on 64-bit Azul. - private static final int MIN_SIZE_LOG = 3; // - - private static final int MIN_SIZE = (1 << MIN_SIZE_LOG); // Must be power of 2 - - // --- Sentinels ------------------------- - // No-Match-Old - putIfMatch does updates only if it matches the old value, - // and NO_MATCH_OLD basically counts as a wildcard match. - private static final Object NO_MATCH_OLD = new Object(); // Sentinel - // Match-Any-not-null - putIfMatch does updates only if it find a real old - // value. - - private static final Object MATCH_ANY = new Object(); // Sentinel - // This K/V pair has been deleted (but the Key slot is forever claimed). - // The same Key can be reinserted with a new value later. - - public static final Object TOMBSTONE = new Object(); - // Prime'd or box'd version of TOMBSTONE. This K/V pair was deleted, then a - // table resize started. The K/V pair has been marked so that no new - // updates can happen to the old table (and since the K/V pair was deleted - // nothing was copied to the new table). - - private static final Prime TOMBPRIME = new Prime(TOMBSTONE); - - // --- key,val ------------------------------------------------------------- - // Access K,V for a given idx - // - // Note that these are static, so that the caller is forced to read the _kvs - // field only once, and share that read across all key/val calls - lest the - // _kvs field move out from under us and back-to-back key & val calls refer - // to different _kvs arrays. - private static final Object key(Object[] kvs, int idx) { - return kvs[(idx << 1) + 2]; - } - - private static final Object val(Object[] kvs, int idx) { - return kvs[(idx << 1) + 3]; - } - - private static final boolean CAS_key(Object[] kvs, int idx, Object old, Object key) { - return UNSAFE.compareAndSwapObject(kvs, rawIndex(kvs, (idx << 1) + 2), old, key); - } - - private static final boolean CAS_val(Object[] kvs, int idx, Object old, Object val) { - return UNSAFE.compareAndSwapObject(kvs, rawIndex(kvs, (idx << 1) + 3), old, val); - } - - // --- dump ---------------------------------------------------------------- - // Verbose printout of table internals, useful for debugging. - public final void print() { - System.out.println("========="); - print2(_kvs); - System.out.println("========="); - } - // print the entire state of the table - - private final void print(Object[] kvs) { - for (int i = 0; i < len(kvs); i++) { - Object K = key(kvs, i); - if (K != null) { - String KS = (K == TOMBSTONE) ? "XXX" : K.toString(); - Object V = val(kvs, i); - Object U = Prime.unbox(V); - String p = (V == U) ? "" : "prime_"; - String US = (U == TOMBSTONE) ? "tombstone" : U.toString(); - System.out.println("" + i + " (" + KS + "," + p + US + ")"); - } - } - Object[] newkvs = chm(kvs)._newkvs; // New table, if any - if (newkvs != null) { - System.out.println("----"); - print(newkvs); - } - } - // print only the live values, broken down by the table they are in - - private final void print2(Object[] kvs) { - for (int i = 0; i < len(kvs); i++) { - Object key = key(kvs, i); - Object val = val(kvs, i); - Object U = Prime.unbox(val); - if (key != null && key != TOMBSTONE - && // key is sane - val != null && U != TOMBSTONE) { // val is sane - String p = (val == U) ? "" : "prime_"; - System.out.println("" + i + " (" + key + "," + p + val + ")"); - } - } - Object[] newkvs = chm(kvs)._newkvs; // New table, if any - if (newkvs != null) { - System.out.println("----"); - print2(newkvs); - } - } - - // Count of reprobes - private transient ConcurrentAutoTable _reprobes = new ConcurrentAutoTable(); - - /** Get and clear the current count of reprobes. Reprobes happen on key - * collisions, and a high reprobe rate may indicate a poor hash function or - * weaknesses in the table resizing function. - * - * @return the count of reprobes since the last call to {@link #reprobes} - * or since the table was created. */ - public long reprobes() { - long r = _reprobes.get(); - _reprobes = new ConcurrentAutoTable(); - return r; - } - - // --- reprobe_limit ----------------------------------------------------- - // Heuristic to decide if we have reprobed toooo many times. Running over - // the reprobe limit on a 'get' call acts as a 'miss'; on a 'put' call it - // can trigger a table resize. Several places must have exact agreement on - // what the reprobe_limit is, so we share it here. - private static int reprobe_limit(int len) { - return REPROBE_LIMIT + (len >> 4); - } - - // --- NonBlockingHashMap -------------------------------------------------- - // Constructors - /** Create a new NonBlockingHashMap with default minimum size (currently set - * to 8 K/V pairs or roughly 84 bytes on a standard 32-bit JVM). */ - public NonBlockingHashMap() { - this(MIN_SIZE); - } - - /** * Create a new NonBlockingHashMap with initial room for the given number of - * elements, thus avoiding internal resizing operations to reach an - * appropriate size.Large numbers here when used with a small count of - * elements will sacrifice space for a small amount of time gained. The - * initial size will be rounded up internally to the next larger power of 2. - * @param initial_sz int - */ - public NonBlockingHashMap(final int initial_sz) { - initialize(initial_sz); - } - - private final void initialize(int initial_sz) { - if (initial_sz < 0) { - throw new IllegalArgumentException("initial_sz: " + initial_sz + " (expected: >= 0)"); - } - int i; // Convert to next largest power-of-2 - if (initial_sz > 1024 * 1024) initial_sz = 1024 * 1024; - for (i = MIN_SIZE_LOG; (1 << i) < (initial_sz << 2); i++) ; - // Double size for K,V pairs, add 1 for CHM and 1 for hashes - _kvs = new Object[((1 << i) << 1) + 2]; - _kvs[0] = new CHM(new ConcurrentAutoTable()); // CHM in slot 0 - _kvs[1] = new int[1 << i]; // Matching hash entries - _last_resize_milli = System.currentTimeMillis(); - } - // Version for subclassed readObject calls, to be called after the defaultReadObject - - protected final void initialize() { - initialize(MIN_SIZE); - } - - // --- wrappers ------------------------------------------------------------ - /** Returns the number of key-value mappings in this map. - * - * @return the number of key-value mappings in this map */ - @Override - public int size() { - return chm(_kvs).size(); - } - - /** Returns <tt>size() == 0</tt>. - * - * @return <tt>size() == 0</tt> */ - @Override - public boolean isEmpty() { - return size() == 0; - } - - /** Tests if the key in the table using the <tt>equals</tt> method. - * - * @return <tt>true</tt> if the key is in the table using the <tt>equals</tt> method - * @throws NullPointerException if the specified key is null */ - @Override - public boolean containsKey(Object key) { - return get(key) != null; - } - - /** Legacy method testing if some key maps into the specified value in this - * table. This method is identical in functionality to {@link - * #containsValue}, and exists solely to ensure full compatibility with - * class {@link java.util.Hashtable}, which supported this method prior to - * introduction of the Java Collections framework. - * - * @param val a value to search for - * - * @return <tt>true</tt> if this map maps one or more keys to the specified value - * @throws NullPointerException if the specified value is null */ - public boolean contains(Object val) { - return containsValue(val); - } - - /** Maps the specified key to the specified value in the table. Neither key - * nor value can be null. - *

- * The value can be retrieved by calling {@link #get} with a key that is - * equal to the original key. - * - * @param key key with which the specified value is to be associated - * @param val value to be associated with the specified key - * - * @return the previous value associated with <tt>key</tt>, or - * <tt>null</tt> if there was no mapping for <tt>key</tt> - * @throws NullPointerException if the specified key or value is null */ - @Override - public TypeV put(TypeK key, TypeV val) { - return putIfMatch(key, val, NO_MATCH_OLD); - } - - /** Atomically, do a {@link #put} if-and-only-if the key is not mapped. - * Useful to ensure that only a single mapping for the key exists, even if - * many threads are trying to create the mapping in parallel. - * - * @return the previous value associated with the specified key, - * or <tt>null</tt> if there was no mapping for the key - * @throws NullPointerException if the specified key or value is null */ - @Override - public TypeV putIfAbsent(TypeK key, TypeV val) { - return putIfMatch(key, val, TOMBSTONE); - } - - /** Removes the key (and its corresponding value) from this map. - * This method does nothing if the key is not in the map. - * - * @return the previous value associated with <tt>key</tt>, or - * <tt>null</tt> if there was no mapping for <tt>key</tt> - * @throws NullPointerException if the specified key is null */ - @Override - public TypeV remove(Object key) { - return putIfMatch(key, TOMBSTONE, NO_MATCH_OLD); - } - - /** Atomically do a {@link #remove(Object)} if-and-only-if the key is mapped - * to a value which is equals to the given value. - * - * @throws NullPointerException if the specified key or value is null */ - public boolean remove(Object key, Object val) { - return objectsEquals(putIfMatch(key, TOMBSTONE, val), val); - } - - /** Atomically do a put(key,val) if-and-only-if the key is - * mapped to some value already. - * - * @throws NullPointerException if the specified key or value is null */ - @Override - public TypeV replace(TypeK key, TypeV val) { - return putIfMatch(key, val, MATCH_ANY); - } - - /** Atomically do a put(key,newValue) if-and-only-if the key is - * mapped a value which is equals to oldValue. - * - * @throws NullPointerException if the specified key or value is null */ - @Override - public boolean replace(TypeK key, TypeV oldValue, TypeV newValue) { - return objectsEquals(putIfMatch(key, newValue, oldValue), oldValue); - } - - private static boolean objectsEquals(Object a, Object b) { - return (a == b) || (a != null && a.equals(b)); - } - - // Atomically replace newVal for oldVal, returning the value that existed - // there before. If the oldVal matches the returned value, then newVal was - // inserted, otherwise not. A null oldVal means the key does not exist (only - // insert if missing); a null newVal means to remove the key. - public final TypeV putIfMatchAllowNull(Object key, Object newVal, Object oldVal) { - if (oldVal == null) oldVal = TOMBSTONE; - if (newVal == null) newVal = TOMBSTONE; - final TypeV res = (TypeV) putIfMatch0(this, _kvs, key, newVal, oldVal); - assert !(res instanceof Prime); - //assert res != null; - return res == TOMBSTONE ? null : res; - } - - /** * Atomically replace newVal for oldVal, returning the value that existed - * there before.If the oldVal matches the returned value, then newVal was - * inserted, otherwise not. - * - * @param key key - * @param newVal newVal - * @param oldVal oldVal - * @return the previous value associated with the specified key, - * or <tt>null</tt> if there was no mapping for the key - * @throws NullPointerException if the key or either value is null - */ - public final TypeV putIfMatch(Object key, Object newVal, Object oldVal) { - if (oldVal == null || newVal == null) throw new NullPointerException(); - final Object res = putIfMatch0(this, _kvs, key, newVal, oldVal); - assert !(res instanceof Prime); - assert res != null; - return res == TOMBSTONE ? null : (TypeV) res; - } - - /** Copies all of the mappings from the specified map to this one, replacing - * any existing mappings. - * - * @param m mappings to be stored in this map */ - @Override - public void putAll(Map m) { - for (Map.Entry e : m.entrySet()) - put(e.getKey(), e.getValue()); - } - - /** Removes all of the mappings from this map. */ - @Override - public void clear() { // Smack a new empty table down - Object[] newkvs = new NonBlockingHashMap(MIN_SIZE)._kvs; - while (!CAS_kvs(_kvs, newkvs)) // Spin until the clear works - ; - } - - /** Returns <tt>true</tt> if this Map maps one or more keys to the specified - * value. Note: This method requires a full internal traversal of the - * hash table and is much slower than {@link #containsKey}. - * - * @param val value whose presence in this map is to be tested - * - * @return <tt>true</tt> if this map maps one or more keys to the specified value - * @throws NullPointerException if the specified value is null */ - @Override - public boolean containsValue(final Object val) { - if (val == null) throw new NullPointerException(); - for (TypeV V : values()) - if (V == val || V.equals(val)) - return true; - return false; - } - - // This function is supposed to do something for Hashtable, and the JCK - // tests hang until it gets called... by somebody ... for some reason, - // any reason.... - protected void rehash() { - //do nothing - } - - /** - * Creates a shallow copy of this hashtable. All the structure of the - * hashtable itself is copied, but the keys and values are not cloned. - * This is a relatively expensive operation. - * - * @return a clone of the hashtable. - */ - @Override - public Object clone() { - try { - // Must clone, to get the class right; NBHM might have been - // extended so it would be wrong to just make a new NBHM. - NonBlockingHashMap t = (NonBlockingHashMap) super.clone(); - // But I don't have an atomic clone operation - the underlying _kvs - // structure is undergoing rapid change. If I just clone the _kvs - // field, the CHM in _kvs[0] won't be in sync. - // - // Wipe out the cloned array (it was shallow anyways). - t.clear(); - // Now copy sanely - for (TypeK K : keySet()) { - final TypeV V = get(K); // Do an official 'get' - t.put(K, V); - } - return t; - } catch (CloneNotSupportedException e) { - // this shouldn't happen, since we are Cloneable - throw new InternalError(); - } - } - - /** - * Returns a string representation of this map. The string representation - * consists of a list of key-value mappings in the order returned by the - * map's <tt>entrySet</tt> view's iterator, enclosed in braces - * (<tt>"{}"</tt>). Adjacent mappings are separated by the characters - * <tt>", "</tt> (comma and space). Each key-value mapping is rendered as - * the key followed by an equals sign (<tt>"="</tt>) followed by the - * associated value. Keys and values are converted to strings as by - * {@link String#valueOf(Object)}. - * - * @return a string representation of this map - */ - @Override - public String toString() { - Iterator> i = entrySet().iterator(); - if (!i.hasNext()) - return "{}"; - - StringBuilder sb = new StringBuilder(); - sb.append('{'); - for (;;) { - Entry e = i.next(); - TypeK key = e.getKey(); - TypeV value = e.getValue(); - sb.append(key == this ? "(this Map)" : key); - sb.append('='); - sb.append(value == this ? "(this Map)" : value); - if (!i.hasNext()) - return sb.append('}').toString(); - sb.append(", "); - } - } - - // --- keyeq --------------------------------------------------------------- - // Check for key equality. Try direct pointer compare first, then see if - // the hashes are unequal (fast negative test) and finally do the full-on - // 'equals' v-call. - private static boolean keyeq(Object K, Object key, int[] hashes, int hash, int fullhash) { - return K == key - || // Either keys match exactly OR - // hash exists and matches? hash can be zero during the install of a - // new key/value pair. - ((hashes[hash] == 0 || hashes[hash] == fullhash) - && // Do not call the users' "equals()" call with a Tombstone, as this can - // surprise poorly written "equals()" calls that throw exceptions - // instead of simply returning false. - K != TOMBSTONE - && // Do not call users' equals call with a Tombstone - // Do the match the hard way - with the users' key being the loop- - // invariant "this" pointer. I could have flipped the order of - // operands (since equals is commutative), but I'm making mega-morphic - // v-calls in a re-probing loop and nailing down the 'this' argument - // gives both the JIT and the hardware a chance to prefetch the call target. - key.equals(K)); // Finally do the hard match - } - - // --- get ----------------------------------------------------------------- - /** * Returns the value to which the specified key is mapped, or {@code null} - * if this map contains no mapping for the key. - *

- * More formally, if this map contains a mapping from a key {@code k} to - * a value {@code v} such that {@code key.equals(k)}, then this method - * returns {@code v}; otherwise it returns {@code null}. (There can be at - * most one such mapping.) - * - * @param key key - * @return Type - * @throws NullPointerException if the specified key is null */ - // Never returns a Prime nor a Tombstone. - @Override - public TypeV get(Object key) { - final Object V = get_impl(this, _kvs, key); - assert !(V instanceof Prime); // Never return a Prime - assert V != TOMBSTONE; - return (TypeV) V; - } - - private static final Object get_impl(final NonBlockingHashMap topmap, final Object[] kvs, final Object key) { - final int fullhash = hash(key); // throws NullPointerException if key is null - final int len = len(kvs); // Count of key/value pairs, reads kvs.length - final CHM chm = chm(kvs); // The CHM, for a volatile read below; reads slot 0 of kvs - final int[] hashes = hashes(kvs); // The memoized hashes; reads slot 1 of kvs - - int idx = fullhash & (len - 1); // First key hash - - // Main spin/reprobe loop, looking for a Key hit - int reprobe_cnt = 0; - while (true) { - // Probe table. Each read of 'val' probably misses in cache in a big - // table; hopefully the read of 'key' then hits in cache. - final Object K = key(kvs, idx); // Get key before volatile read, could be null - final Object V = val(kvs, idx); // Get value before volatile read, could be null or Tombstone or Prime - if (K == null) return null; // A clear miss - - // We need a volatile-read here to preserve happens-before semantics on - // newly inserted Keys. If the Key body was written just before inserting - // into the table a Key-compare here might read the uninitialized Key body. - // Annoyingly this means we have to volatile-read before EACH key compare. - // . - // We also need a volatile-read between reading a newly inserted Value - // and returning the Value (so the user might end up reading the stale - // Value contents). Same problem as with keys - and the one volatile - // read covers both. - final Object[] newkvs = chm._newkvs; // VOLATILE READ before key compare - - // Key-compare - if (keyeq(K, key, hashes, idx, fullhash)) { - // Key hit! Check for no table-copy-in-progress - if (!(V instanceof Prime)) // No copy? - return (V == TOMBSTONE) ? null : V; // Return the value - // Key hit - but slot is (possibly partially) copied to the new table. - // Finish the copy & retry in the new table. - return get_impl(topmap, chm.copy_slot_and_check(topmap, kvs, idx, key), key); // Retry in the new table - } - // get and put must have the same key lookup logic! But only 'put' - // needs to force a table-resize for a too-long key-reprobe sequence. - // Check for too-many-reprobes on get - and flip to the new table. - if (++reprobe_cnt >= reprobe_limit(len) - || // too many probes - K == TOMBSTONE) // found a TOMBSTONE key, means no more keys in this table - return newkvs == null ? null : get_impl(topmap, topmap.help_copy(newkvs), key); // Retry in the new table - - idx = (idx + 1) & (len - 1); // Reprobe by 1! (could now prefetch) - } - } - - // --- getk ----------------------------------------------------------------- - /** Returns the Key to which the specified key is mapped, or {@code null} - * if this map contains no mapping for the key. - * - * @param key TypeK - * @return TypeK - * @throws NullPointerException if the specified key is null */ - // Never returns a Prime nor a Tombstone. - public TypeK getk(TypeK key) { - return (TypeK) getk_impl(this, _kvs, key); - } - - private static final Object getk_impl(final NonBlockingHashMap topmap, final Object[] kvs, final Object key) { - final int fullhash = hash(key); // throws NullPointerException if key is null - final int len = len(kvs); // Count of key/value pairs, reads kvs.length - final CHM chm = chm(kvs); // The CHM, for a volatile read below; reads slot 0 of kvs - final int[] hashes = hashes(kvs); // The memoized hashes; reads slot 1 of kvs - - int idx = fullhash & (len - 1); // First key hash - - // Main spin/reprobe loop, looking for a Key hit - int reprobe_cnt = 0; - while (true) { - // Probe table. - final Object K = key(kvs, idx); // Get key before volatile read, could be null - if (K == null) return null; // A clear miss - - // We need a volatile-read here to preserve happens-before semantics on - // newly inserted Keys. If the Key body was written just before inserting - // into the table a Key-compare here might read the uninitialized Key body. - // Annoyingly this means we have to volatile-read before EACH key compare. - // . - // We also need a volatile-read between reading a newly inserted Value - // and returning the Value (so the user might end up reading the stale - // Value contents). Same problem as with keys - and the one volatile - // read covers both. - final Object[] newkvs = chm._newkvs; // VOLATILE READ before key compare - - // Key-compare - if (keyeq(K, key, hashes, idx, fullhash)) - return K; // Return existing Key! - - // get and put must have the same key lookup logic! But only 'put' - // needs to force a table-resize for a too-long key-reprobe sequence. - // Check for too-many-reprobes on get - and flip to the new table. - if (++reprobe_cnt >= reprobe_limit(len) - || // too many probes - K == TOMBSTONE) { // found a TOMBSTONE key, means no more keys in this table - return newkvs == null ? null : getk_impl(topmap, topmap.help_copy(newkvs), key); // Retry in the new table - } - - idx = (idx + 1) & (len - 1); // Reprobe by 1! (could now prefetch) - } - } - - static volatile int DUMMY_VOLATILE; - - /** - * Put, Remove, PutIfAbsent, etc. Return the old value. If the returned value is equal to expVal (or expVal is - * {@link #NO_MATCH_OLD}) then the put can be assumed to work (although might have been immediately overwritten). - * Only the path through copy_slot passes in an expected value of null, and putIfMatch only returns a null if passed - * in an expected null. - * - * @param topmap the map to act on - * @param kvs the KV table snapshot we act on - * @param key not null (will result in {@link NullPointerException}) - * @param putval the new value to use. Not null. {@link #TOMBSTONE} will result in deleting the entry. - * @param expVal expected old value. Can be null. {@link #NO_MATCH_OLD} for an unconditional put/remove. - * {@link #TOMBSTONE} if we expect old entry to not exist(null/{@link #TOMBSTONE} value). - * {@link #MATCH_ANY} will ignore the current value, but only if an entry exists. A null expVal is used - * internally to perform a strict insert-if-never-been-seen-before operation. - * - * @return {@link #TOMBSTONE} if key does not exist or match has failed. null if expVal is - * null AND old value was null. Otherwise the old entry value (not null). - */ - private static final Object putIfMatch0( - final NonBlockingHashMap topmap, - final Object[] kvs, - final Object key, - final Object putval, - final Object expVal) { - assert putval != null; - assert !(putval instanceof Prime); - assert !(expVal instanceof Prime); - final int fullhash = hash(key); // throws NullPointerException if key null - final int len = len(kvs); // Count of key/value pairs, reads kvs.length - final CHM chm = chm(kvs); // Reads kvs[0] - final int[] hashes = hashes(kvs); // Reads kvs[1], read before kvs[0] - int idx = fullhash & (len - 1); - - // --- - // Key-Claim stanza: spin till we can claim a Key (or force a resizing). - int reprobe_cnt = 0; - Object K = null, V = null; - Object[] newkvs = null; - while (true) { // Spin till we get a Key slot - V = val(kvs, idx); // Get old value (before volatile read below!) - K = key(kvs, idx); // Get current key - if (K == null) { // Slot is free? - // Found an empty Key slot - which means this Key has never been in - // this table. No need to put a Tombstone - the Key is not here! - if (putval == TOMBSTONE) return TOMBSTONE; // Not-now & never-been in this table - if (expVal == MATCH_ANY) return TOMBSTONE; // Will not match, even after K inserts - // Claim the null key-slot - if (CAS_key(kvs, idx, null, key)) { // Claim slot for Key - chm._slots.add(1); // Raise key-slots-used count - hashes[idx] = fullhash; // Memoize fullhash - break; // Got it! - } - // CAS to claim the key-slot failed. - // - // This re-read of the Key points out an annoying short-coming of Java - // CAS. Most hardware CAS's report back the existing value - so that - // if you fail you have a *witness* - the value which caused the CAS to - // fail. The Java API turns this into a boolean destroying the - // witness. Re-reading does not recover the witness because another - // thread can write over the memory after the CAS. Hence we can be in - // the unfortunate situation of having a CAS fail *for cause* but - // having that cause removed by a later store. This turns a - // non-spurious-failure CAS (such as Azul has) into one that can - // apparently spuriously fail - and we avoid apparent spurious failure - // by not allowing Keys to ever change. - - // Volatile read, to force loads of K to retry despite JIT, otherwise - // it is legal to e.g. haul the load of "K = key(kvs,idx);" outside of - // this loop (since failed CAS ops have no memory ordering semantics). - int dummy = DUMMY_VOLATILE; - continue; - } - // Key slot was not null, there exists a Key here - - // We need a volatile-read here to preserve happens-before semantics on - // newly inserted Keys. If the Key body was written just before inserting - // into the table a Key-compare here might read the uninitialized Key body. - // Annoyingly this means we have to volatile-read before EACH key compare. - newkvs = chm._newkvs; // VOLATILE READ before key compare - - if (keyeq(K, key, hashes, idx, fullhash)) - break; // Got it! - - // get and put must have the same key lookup logic! Lest 'get' give - // up looking too soon. - //topmap._reprobes.add(1); - if (++reprobe_cnt >= reprobe_limit(len) - || // too many probes or - K == TOMBSTONE) { // found a TOMBSTONE key, means no more keys - // We simply must have a new table to do a 'put'. At this point a - // 'get' will also go to the new table (if any). We do not need - // to claim a key slot (indeed, we cannot find a free one to claim!). - newkvs = chm.resize(topmap, kvs); - if (expVal != null) topmap.help_copy(newkvs); // help along an existing copy - return putIfMatch0(topmap, newkvs, key, putval, expVal); - } - - idx = (idx + 1) & (len - 1); // Reprobe! - } // End of spinning till we get a Key slot - - while (true) { // Spin till we insert a value - // --- - // Found the proper Key slot, now update the matching Value slot. We - // never put a null, so Value slots monotonically move from null to - // not-null (deleted Values use Tombstone). Thus if 'V' is null we - // fail this fast cutout and fall into the check for table-full. - if (putval == V) return V; // Fast cutout for no-change - - // See if we want to move to a new table (to avoid high average re-probe - // counts). We only check on the initial set of a Value from null to - // not-null (i.e., once per key-insert). Of course we got a 'free' check - // of newkvs once per key-compare (not really free, but paid-for by the - // time we get here). - if (newkvs == null - && // New table-copy already spotted? - // Once per fresh key-insert check the hard way - ((V == null && chm.tableFull(reprobe_cnt, len)) - || // Or we found a Prime, but the JMM allowed reordering such that we - // did not spot the new table (very rare race here: the writing - // thread did a CAS of _newkvs then a store of a Prime. This thread - // reads the Prime, then reads _newkvs - but the read of Prime was so - // delayed (or the read of _newkvs was so accelerated) that they - // swapped and we still read a null _newkvs. The resize call below - // will do a CAS on _newkvs forcing the read. - V instanceof Prime)) { - newkvs = chm.resize(topmap, kvs); // Force the new table copy to start - } - // See if we are moving to a new table. - // If so, copy our slot and retry in the new table. - if (newkvs != null) { - return putIfMatch0(topmap, chm.copy_slot_and_check(topmap, kvs, idx, expVal), key, putval, expVal); - } - // --- - // We are finally prepared to update the existing table - assert !(V instanceof Prime); - - // Must match old, and we do not? Then bail out now. Note that either V - // or expVal might be TOMBSTONE. Also V can be null, if we've never - // inserted a value before. expVal can be null if we are called from - // copy_slot. - if (expVal != NO_MATCH_OLD - && // Do we care about expected-Value at all? - V != expVal - && // No instant match already? - (expVal != MATCH_ANY || V == TOMBSTONE || V == null) - && !(V == null && expVal == TOMBSTONE) - && // Match on null/TOMBSTONE combo - (expVal == null || !expVal.equals(V))) { // Expensive equals check at the last - return (V == null) ? TOMBSTONE : V; // Do not update! - } - - // Actually change the Value in the Key,Value pair - if (CAS_val(kvs, idx, V, putval)) break; - - // CAS failed - // Because we have no witness, we do not know why it failed. - // Indeed, by the time we look again the value under test might have flipped - // a thousand times and now be the expected value (despite the CAS failing). - // Check for the never-succeed condition of a Prime value and jump to any - // nested table, or else just re-run. - // We would not need this load at all if CAS returned the value on which - // the CAS failed (AKA witness). The new CAS semantics are supported via - // VarHandle in JDK9. - V = val(kvs, idx); // Get new value - - // If a Prime'd value got installed, we need to re-run the put on the - // new table. Otherwise we lost the CAS to another racing put. - if (V instanceof Prime) - return putIfMatch0(topmap, chm.copy_slot_and_check(topmap, kvs, idx, expVal), key, putval, expVal); - - // Simply retry from the start. - // NOTE: need the fence, since otherwise 'val(kvs,idx)' load could be hoisted - // out of loop. - int dummy = DUMMY_VOLATILE; - } - - // CAS succeeded - we did the update! - // Both normal put's and table-copy calls putIfMatch, but table-copy - // does not (effectively) increase the number of live k/v pairs. - if (expVal != null) { - // Adjust sizes - a striped counter - if ((V == null || V == TOMBSTONE) && putval != TOMBSTONE) chm._size.add(1); - if (!(V == null || V == TOMBSTONE) && putval == TOMBSTONE) chm._size.add(-1); - } - - // We won; we know the update happened as expected. - return (V == null && expVal != null) ? TOMBSTONE : V; - } - - // --- help_copy --------------------------------------------------------- - // Help along an existing resize operation. This is just a fast cut-out - // wrapper, to encourage inlining for the fast no-copy-in-progress case. We - // always help the top-most table copy, even if there are nested table - // copies in progress. - private final Object[] help_copy(Object[] helper) { - // Read the top-level KVS only once. We'll try to help this copy along, - // even if it gets promoted out from under us (i.e., the copy completes - // and another KVS becomes the top-level copy). - Object[] topkvs = _kvs; - CHM topchm = chm(topkvs); - if (topchm._newkvs == null) return helper; // No copy in-progress - topchm.help_copy_impl(this, topkvs, false); - return helper; - } - - // --- CHM ----------------------------------------------------------------- - // The control structure for the NonBlockingHashMap - private static final class CHM { - // Size in active K,V pairs - - private final ConcurrentAutoTable _size; - - public int size() { - return (int) _size.get(); - } - - // --- - // These next 2 fields are used in the resizing heuristics, to judge when - // it is time to resize or copy the table. Slots is a count of used-up - // key slots, and when it nears a large fraction of the table we probably - // end up reprobing too much. Last-resize-milli is the time since the - // last resize; if we are running back-to-back resizes without growing - // (because there are only a few live keys but many slots full of dead - // keys) then we need a larger table to cut down on the churn. - // Count of used slots, to tell when table is full of dead unusable slots - private final ConcurrentAutoTable _slots; - - public int slots() { - return (int) _slots.get(); - } - - // --- - // New mappings, used during resizing. - // The 'new KVs' array - created during a resize operation. This - // represents the new table being copied from the old one. It's the - // volatile variable that is read as we cross from one table to the next, - // to get the required memory orderings. It monotonically transits from - // null to set (once). - volatile Object[] _newkvs; - - private static final AtomicReferenceFieldUpdater _newkvsUpdater - = AtomicReferenceFieldUpdater.newUpdater(CHM.class, Object[].class, "_newkvs"); - // Set the _next field if we can. - - boolean CAS_newkvs(Object[] newkvs) { - while (_newkvs == null) - if (_newkvsUpdater.compareAndSet(this, null, newkvs)) - return true; - return false; - } - - // Sometimes many threads race to create a new very large table. Only 1 - // wins the race, but the losers all allocate a junk large table with - // hefty allocation costs. Attempt to control the overkill here by - // throttling attempts to create a new table. I cannot really block here - // (lest I lose the non-blocking property) but late-arriving threads can - // give the initial resizing thread a little time to allocate the initial - // new table. The Right Long Term Fix here is to use array-lets and - // incrementally create the new very large array. In C I'd make the array - // with malloc (which would mmap under the hood) which would only eat - // virtual-address and not real memory - and after Somebody wins then we - // could in parallel initialize the array. Java does not allow - // un-initialized array creation (especially of ref arrays!). - volatile long _resizers; // count of threads attempting an initial resize - - private static final AtomicLongFieldUpdater _resizerUpdater - = AtomicLongFieldUpdater.newUpdater(CHM.class, "_resizers"); - - // --- - // Simple constructor - CHM(ConcurrentAutoTable size) { - _size = size; - _slots = new ConcurrentAutoTable(); - } - - // --- tableFull --------------------------------------------------------- - // Heuristic to decide if this table is too full, and we should start a - // new table. Note that if a 'get' call has reprobed too many times and - // decided the table must be full, then always the estimate_sum must be - // high and we must report the table is full. If we do not, then we might - // end up deciding that the table is not full and inserting into the - // current table, while a 'get' has decided the same key cannot be in this - // table because of too many reprobes. The invariant is: - // slots.estimate_sum >= max_reprobe_cnt >= reprobe_limit(len) - private final boolean tableFull(int reprobe_cnt, int len) { - return // Do the cheap check first: we allow some number of reprobes always - reprobe_cnt >= REPROBE_LIMIT - && (reprobe_cnt >= reprobe_limit(len) - || // More expensive check: see if the table is > 1/2 full. - _slots.estimate_get() >= (len >> 1)); - } - - // --- resize ------------------------------------------------------------ - // Resizing after too many probes. "How Big???" heuristics are here. - // Callers will (not this routine) will 'help_copy' any in-progress copy. - // Since this routine has a fast cutout for copy-already-started, callers - // MUST 'help_copy' lest we have a path which forever runs through - // 'resize' only to discover a copy-in-progress which never progresses. - private final Object[] resize(NonBlockingHashMap topmap, Object[] kvs) { - assert chm(kvs) == this; - - // Check for resize already in progress, probably triggered by another thread - Object[] newkvs = _newkvs; // VOLATILE READ - if (newkvs != null) // See if resize is already in progress - return newkvs; // Use the new table already - - // No copy in-progress, so start one. First up: compute new table size. - int oldlen = len(kvs); // Old count of K,V pairs allowed - int sz = size(); // Get current table count of active K,V pairs - int newsz = sz; // First size estimate - - // Heuristic to determine new size. We expect plenty of dead-slots-with-keys - // and we need some decent padding to avoid endless reprobing. - if (sz >= (oldlen >> 2)) { // If we are >25% full of keys then... - newsz = oldlen << 1; // Double size, so new table will be between 12.5% and 25% full - // For tables less than 1M entries, if >50% full of keys then... - // For tables more than 1M entries, if >75% full of keys then... - if (4L * sz >= ((oldlen >> 20) != 0 ? 3L : 2L) * oldlen) - newsz = oldlen << 2; // Double double size, so new table will be between %12.5 (18.75%) and 25% (25%) - } - // This heuristic in the next 2 lines leads to a much denser table - // with a higher reprobe rate - //if( sz >= (oldlen>>1) ) // If we are >50% full of keys then... - // newsz = oldlen<<1; // Double size - - // Last (re)size operation was very recent? Then double again - // despite having few live keys; slows down resize operations - // for tables subject to a high key churn rate - but do not - // forever grow the table. If there is a high key churn rate - // the table needs a steady state of rare same-size resize - // operations to clean out the dead keys. - long tm = System.currentTimeMillis(); - if (newsz <= oldlen - && // New table would shrink or hold steady? - tm <= topmap._last_resize_milli + 10000) // Recent resize (less than 10 sec ago) - newsz = oldlen << 1; // Double the existing size - // Do not shrink, ever. If we hit this size once, assume we - // will again. - if (newsz < oldlen) newsz = oldlen; - // Convert to power-of-2 - int log2; - for (log2 = MIN_SIZE_LOG; (1 << log2) < newsz; log2++) ; // Compute log2 of size - long len = ((1L << log2) << 1) + 2; - // prevent integer overflow - limit of 2^31 elements in a Java array - // so here, 2^30 + 2 is the largest number of elements in the hash table - if ((int) len != len) { - log2 = 30; - len = (1L << log2) + 2; - if (sz > ((len >> 2) + (len >> 1))) throw new RuntimeException("Table is full."); - } - - // Now limit the number of threads actually allocating memory to a - // handful - lest we have 750 threads all trying to allocate a giant - // resized array. - long r = _resizers; - while (!_resizerUpdater.compareAndSet(this, r, r + 1)) - r = _resizers; - // Size calculation: 2 words (K+V) per table entry, plus a handful. We - // guess at 64-bit pointers; 32-bit pointers screws up the size calc by - // 2x but does not screw up the heuristic very much. - long megs = ((((1L << log2) << 1) + 8) << 3/* word to bytes */) >> 20/* megs */; - if (r >= 2 && megs > 0) { // Already 2 guys trying; wait and see - newkvs = _newkvs; // Between dorking around, another thread did it - if (newkvs != null) // See if resize is already in progress - return newkvs; // Use the new table already - // TO DO - use a wait with timeout, so we'll wakeup as soon as the new table - // is ready, or after the timeout in any case. - // For now, sleep a tad and see if the 2 guys already trying to make - // the table actually get around to making it happen. - try { - Thread.sleep(megs); - } catch (Exception e) { - //do nothing - } - } - // Last check, since the 'new' below is expensive and there is a chance - // that another thread slipped in a new thread while we ran the heuristic. - newkvs = _newkvs; - if (newkvs != null) // See if resize is already in progress - return newkvs; // Use the new table already - - // Double size for K,V pairs, add 1 for CHM - newkvs = new Object[(int) len]; // This can get expensive for big arrays - newkvs[0] = new CHM(_size); // CHM in slot 0 - newkvs[1] = new int[1 << log2]; // hashes in slot 1 - - // Another check after the slow allocation - if (_newkvs != null) // See if resize is already in progress - return _newkvs; // Use the new table already - - // The new table must be CAS'd in so only 1 winner amongst duplicate - // racing resizing threads. Extra CHM's will be GC'd. - if (CAS_newkvs(newkvs)) { // NOW a resize-is-in-progress! - //notifyAll(); // Wake up any sleepers - //long nano = System.nanoTime(); - //System.out.println(" "+nano+" Resize from "+oldlen+" to "+(1< _copyIdxUpdater - = AtomicLongFieldUpdater.newUpdater(CHM.class, "_copyIdx"); - - // Work-done reporting. Used to efficiently signal when we can move to - // the new table. From 0 to len(oldkvs) refers to copying from the old - // table to the new. - volatile long _copyDone = 0; - - static private final AtomicLongFieldUpdater _copyDoneUpdater - = AtomicLongFieldUpdater.newUpdater(CHM.class, "_copyDone"); - - // --- help_copy_impl ---------------------------------------------------- - // Help along an existing resize operation. We hope its the top-level - // copy (it was when we started) but this CHM might have been promoted out - // of the top position. - private final void help_copy_impl(NonBlockingHashMap topmap, Object[] oldkvs, boolean copy_all) { - assert chm(oldkvs) == this; - Object[] newkvs = _newkvs; - assert newkvs != null; // Already checked by caller - int oldlen = len(oldkvs); // Total amount to copy - final int MIN_COPY_WORK = Math.min(oldlen, 1024); // Limit per-thread work - - // --- - int panic_start = -1; - int copyidx = -9999; // Fool javac to think it's initialized - while (_copyDone < oldlen) { // Still needing to copy? - // Carve out a chunk of work. The counter wraps around so every - // thread eventually tries to copy every slot repeatedly. - - // We "panic" if we have tried TWICE to copy every slot - and it still - // has not happened. i.e., twice some thread somewhere claimed they - // would copy 'slot X' (by bumping _copyIdx) but they never claimed to - // have finished (by bumping _copyDone). Our choices become limited: - // we can wait for the work-claimers to finish (and become a blocking - // algorithm) or do the copy work ourselves. Tiny tables with huge - // thread counts trying to copy the table often 'panic'. - if (panic_start == -1) { // No panic? - copyidx = (int) _copyIdx; - while (!_copyIdxUpdater.compareAndSet(this, copyidx, copyidx + MIN_COPY_WORK)) - copyidx = (int) _copyIdx; // Re-read - if (!(copyidx < (oldlen << 1))) // Panic! - panic_start = copyidx; // Record where we started to panic-copy - } - - // We now know what to copy. Try to copy. - int workdone = 0; - for (int i = 0; i < MIN_COPY_WORK; i++) - if (copy_slot(topmap, (copyidx + i) & (oldlen - 1), oldkvs, newkvs)) // Made an oldtable slot go dead? - workdone++; // Yes! - if (workdone > 0) // Report work-done occasionally - copy_check_and_promote(topmap, oldkvs, workdone);// See if we can promote - //for( int i=0; i 0) { - while (!_copyDoneUpdater.compareAndSet(this, copyDone, copyDone + workdone)) { - copyDone = _copyDone; // Reload, retry - assert (copyDone + workdone) <= oldlen; - } - } - - // Check for copy being ALL done, and promote. Note that we might have - // nested in-progress copies and manage to finish a nested copy before - // finishing the top-level copy. We only promote top-level copies. - if (copyDone + workdone == oldlen - && // Ready to promote this table? - topmap._kvs == oldkvs - && // Looking at the top-level table? - // Attempt to promote - topmap.CAS_kvs(oldkvs, _newkvs)) { - topmap._last_resize_milli = System.currentTimeMillis(); // Record resize time for next check - } - } - - // --- copy_slot --------------------------------------------------------- - // Copy one K/V pair from oldkvs[i] to newkvs. Returns true if we can - // confirm that we set an old-table slot to TOMBPRIME, and only returns after - // updating the new table. We need an accurate confirmed-copy count so - // that we know when we can promote (if we promote the new table too soon, - // other threads may 'miss' on values not-yet-copied from the old table). - // We don't allow any direct updates on the new table, unless they first - // happened to the old table - so that any transition in the new table from - // null to not-null must have been from a copy_slot (or other old-table - // overwrite) and not from a thread directly writing in the new table. - private boolean copy_slot(NonBlockingHashMap topmap, int idx, Object[] oldkvs, Object[] newkvs) { - // Blindly set the key slot from null to TOMBSTONE, to eagerly stop - // fresh put's from inserting new values in the old table when the old - // table is mid-resize. We don't need to act on the results here, - // because our correctness stems from box'ing the Value field. Slamming - // the Key field is a minor speed optimization. - Object key; - while ((key = key(oldkvs, idx)) == null) - CAS_key(oldkvs, idx, null, TOMBSTONE); - - // --- - // Prevent new values from appearing in the old table. - // Box what we see in the old table, to prevent further updates. - Object oldval = val(oldkvs, idx); // Read OLD table - while (!(oldval instanceof Prime)) { - final Prime box = (oldval == null || oldval == TOMBSTONE) ? TOMBPRIME : new Prime(oldval); - if (CAS_val(oldkvs, idx, oldval, box)) { // CAS down a box'd version of oldval - // If we made the Value slot hold a TOMBPRIME, then we both - // prevented further updates here but also the (absent) - // oldval is vacuously available in the new table. We - // return with true here: any thread looking for a value for - // this key can correctly go straight to the new table and - // skip looking in the old table. - if (box == TOMBPRIME) - return true; - // Otherwise we boxed something, but it still needs to be - // copied into the new table. - oldval = box; // Record updated oldval - break; // Break loop; oldval is now boxed by us - } - oldval = val(oldkvs, idx); // Else try, try again - } - if (oldval == TOMBPRIME) return false; // Copy already complete here! - - // --- - // Copy the value into the new table, but only if we overwrite a null. - // If another value is already in the new table, then somebody else - // wrote something there and that write is happens-after any value that - // appears in the old table. - Object old_unboxed = ((Prime) oldval)._V; - assert old_unboxed != TOMBSTONE; - putIfMatch0(topmap, newkvs, key, old_unboxed, null); - - // --- - // Finally, now that any old value is exposed in the new table, we can - // forever hide the old-table value by slapping a TOMBPRIME down. This - // will stop other threads from uselessly attempting to copy this slot - // (i.e., it's a speed optimization not a correctness issue). - while (oldval != TOMBPRIME && !CAS_val(oldkvs, idx, oldval, TOMBPRIME)) - oldval = val(oldkvs, idx); - - return oldval != TOMBPRIME; // True if we slammed the TOMBPRIME down - } // end copy_slot - } // End of CHM - - // --- Snapshot ------------------------------------------------------------ - // The main class for iterating over the NBHM. It "snapshots" a clean - // view of the K/V array. - private class SnapshotV implements Iterator, Enumeration { - - final Object[] _sskvs; - - public SnapshotV() { - while (true) { // Verify no table-copy-in-progress - Object[] topkvs = _kvs; - CHM topchm = chm(topkvs); - if (topchm._newkvs == null) { // No table-copy-in-progress - // The "linearization point" for the iteration. Every key in this - // table will be visited, but keys added later might be skipped or - // even be added to a following table (also not iterated over). - _sskvs = topkvs; - break; - } - // Table copy in-progress - so we cannot get a clean iteration. We - // must help finish the table copy before we can start iterating. - topchm.help_copy_impl(NonBlockingHashMap.this, topkvs, true); - } - // Warm-up the iterator - next(); - } - - int length() { - return len(_sskvs); - } - - Object key(int idx) { - return NonBlockingHashMap.key(_sskvs, idx); - } - - private int _idx; // Varies from 0-keys.length - - private Object _nextK, _prevK; // Last 2 keys found - - private TypeV _nextV, _prevV; // Last 2 values found - - public boolean hasNext() { - return _nextV != null; - } - - public TypeV next() { - // 'next' actually knows what the next value will be - it had to - // figure that out last go-around lest 'hasNext' report true and - // some other thread deleted the last value. Instead, 'next' - // spends all its effort finding the key that comes after the - // 'next' key. - if (_idx != 0 && _nextV == null) throw new NoSuchElementException(); - _prevK = _nextK; // This will become the previous key - _prevV = _nextV; // This will become the previous value - _nextV = null; // We have no more next-key - // Attempt to set <_nextK,_nextV> to the next K,V pair. - // _nextV is the trigger: stop searching when it is != null - while (_idx < length()) { // Scan array - _nextK = key(_idx++); // Get a key that definitely is in the set (for the moment!) - if (_nextK != null - && // Found something? - _nextK != TOMBSTONE - && (_nextV = get(_nextK)) != null) - break; // Got it! _nextK is a valid Key - } // Else keep scanning - return _prevV; // Return current value. - } - - public void removeKey() { - if (_prevV == null) throw new IllegalStateException(); - NonBlockingHashMap.this.putIfMatch(_prevK, TOMBSTONE, NO_MATCH_OLD); - _prevV = null; - } - - @Override - public void remove() { - // NOTE: it would seem logical that value removal will semantically mean removing the matching value for the - // mapping , but the JDK always removes by key, even when the value has changed. - removeKey(); - } - - public TypeV nextElement() { - return next(); - } - - public boolean hasMoreElements() { - return hasNext(); - } - } - - public Object[] raw_array() { - return new SnapshotV()._sskvs; - } - - /** Returns an enumeration of the values in this table. - * - * @return an enumeration of the values in this table - * @see #values() */ - public Enumeration elements() { - return new SnapshotV(); - } - - // --- values -------------------------------------------------------------- - /** Returns a {@link Collection} view of the values contained in this map. - * The collection is backed by the map, so changes to the map are reflected - * in the collection, and vice-versa. The collection supports element - * removal, which removes the corresponding mapping from this map, via the - * <tt>Iterator.remove</tt>, <tt>Collection.remove</tt>, - * <tt>removeAll</tt>, <tt>retainAll</tt>, and <tt>clear</tt> operations. - * It does not support the <tt>add</tt> or <tt>addAll</tt> operations. - * - *

- * The view's <tt>iterator</tt> is a "weakly consistent" iterator that - * will never throw {@link ConcurrentModificationException}, and guarantees - * to traverse elements as they existed upon construction of the iterator, - * and may (but is not guaranteed to) reflect any modifications subsequent - * to construction. */ - @Override - public Collection values() { - return new AbstractCollection() { - @Override - public void clear() { - NonBlockingHashMap.this.clear(); - } - - @Override - public int size() { - return NonBlockingHashMap.this.size(); - } - - @Override - public boolean contains(Object v) { - return NonBlockingHashMap.this.containsValue(v); - } - - @Override - public Iterator iterator() { - return new SnapshotV(); - } - }; - } - - // --- keySet -------------------------------------------------------------- - private class SnapshotK implements Iterator, Enumeration { - - final SnapshotV _ss; - - public SnapshotK() { - _ss = new SnapshotV(); - } - - public void remove() { - _ss.removeKey(); - } - - public TypeK next() { - _ss.next(); - return (TypeK) _ss._prevK; - } - - public boolean hasNext() { - return _ss.hasNext(); - } - - public TypeK nextElement() { - return next(); - } - - public boolean hasMoreElements() { - return hasNext(); - } - } - - /** Returns an enumeration of the keys in this table. - * - * @return an enumeration of the keys in this table - * @see #keySet() */ - public Enumeration keys() { - return new SnapshotK(); - } - - /** Returns a {@link Set} view of the keys contained in this map. The set - * is backed by the map, so changes to the map are reflected in the set, - * and vice-versa. The set supports element removal, which removes the - * corresponding mapping from this map, via the <tt>Iterator.remove</tt>, - * <tt>Set.remove</tt>, <tt>removeAll</tt>, <tt>retainAll</tt>, and - * <tt>clear</tt> operations. It does not support the <tt>add</tt> or - * <tt>addAll</tt> operations. - * - *

- * The view's <tt>iterator</tt> is a "weakly consistent" iterator that - * will never throw {@link ConcurrentModificationException}, and guarantees - * to traverse elements as they existed upon construction of the iterator, - * and may (but is not guaranteed to) reflect any modifications subsequent - * to construction. */ - @Override - public Set keySet() { - return new AbstractSet() { - @Override - public void clear() { - NonBlockingHashMap.this.clear(); - } - - @Override - public int size() { - return NonBlockingHashMap.this.size(); - } - - @Override - public boolean contains(Object k) { - return NonBlockingHashMap.this.containsKey(k); - } - - @Override - public boolean remove(Object k) { - return NonBlockingHashMap.this.remove(k) != null; - } - - @Override - public Iterator iterator() { - return new SnapshotK(); - } - // This is an efficient implementation of toArray instead of the standard - // one. In particular it uses a smart iteration over the NBHM. - - @Override - public T[] toArray(T[] a) { - Object[] kvs = raw_array(); - // Estimate size of array; be prepared to see more or fewer elements - int sz = size(); - T[] r = a.length >= sz ? a - : (T[]) java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), sz); - // Fast efficient element walk. - int j = 0; - for (int i = 0; i < len(kvs); i++) { - Object K = key(kvs, i); - Object V = Prime.unbox(val(kvs, i)); - if (K != null && K != TOMBSTONE && V != null && V != TOMBSTONE) { - if (j >= r.length) { - int sz2 = (int) Math.min(Integer.MAX_VALUE - 8, ((long) j) << 1); - if (sz2 <= r.length) throw new OutOfMemoryError("Required array size too large"); - r = Arrays.copyOf(r, sz2); - } - r[j++] = (T) K; - } - } - if (j <= a.length) { // Fit in the original array? - if (a != r) System.arraycopy(r, 0, a, 0, j); - if (j < a.length) r[j++] = null; // One final null not in the spec but in the default impl - return a; // Return the original - } - return Arrays.copyOf(r, j); - } - }; - } - - // --- entrySet ------------------------------------------------------------ - // Warning: Each call to 'next' in this iterator constructs a new NBHMEntry. - private class NBHMEntry extends AbstractEntry { - - NBHMEntry(final TypeK k, final TypeV v) { - super(k, v); - } - - public TypeV setValue(final TypeV val) { - if (val == null) throw new NullPointerException(); - _val = val; - return put(_key, val); - } - } - - private class SnapshotE implements Iterator> { - - final SnapshotV _ss; - - public SnapshotE() { - _ss = new SnapshotV(); - } - - public void remove() { - // NOTE: it would seem logical that entry removal will semantically mean removing the matching pair , but - // the JDK always removes by key, even when the value has changed. - _ss.removeKey(); - } - - public Map.Entry next() { - _ss.next(); - return new NBHMEntry((TypeK) _ss._prevK, _ss._prevV); - } - - public boolean hasNext() { - return _ss.hasNext(); - } - } - - /** Returns a {@link Set} view of the mappings contained in this map. The - * set is backed by the map, so changes to the map are reflected in the - * set, and vice-versa. The set supports element removal, which removes - * the corresponding mapping from the map, via the - * <tt>Iterator.remove</tt>, <tt>Set.remove</tt>, <tt>removeAll</tt>, - * <tt>retainAll</tt>, and <tt>clear</tt> operations. It does not support - * the <tt>add</tt> or <tt>addAll</tt> operations. - * - *

- * The view's <tt>iterator</tt> is a "weakly consistent" iterator - * that will never throw {@link ConcurrentModificationException}, - * and guarantees to traverse elements as they existed upon - * construction of the iterator, and may (but is not guaranteed to) - * reflect any modifications subsequent to construction. - * - *

- * Warning: the iterator associated with this Set - * requires the creation of {@link java.util.Map.Entry} objects with each - * iteration. The NonBlockingHashMap does not normally create or - * using {@link java.util.Map.Entry} objects so they will be created soley - * to support this iteration. Iterating using Map#keySet or Map##values will be more efficient. - */ - @Override - public Set> entrySet() { - return new AbstractSet>() { - @Override - public void clear() { - NonBlockingHashMap.this.clear(); - } - - @Override - public int size() { - return NonBlockingHashMap.this.size(); - } - - @Override - public boolean remove(final Object o) { - if (!(o instanceof Map.Entry)) return false; - final Map.Entry e = (Map.Entry) o; - return NonBlockingHashMap.this.remove(e.getKey(), e.getValue()); - } - - @Override - public boolean contains(final Object o) { - if (!(o instanceof Map.Entry)) return false; - final Map.Entry e = (Map.Entry) o; - TypeV v = get(e.getKey()); - return v != null && v.equals(e.getValue()); - } - - @Override - public Iterator> iterator() { - return new SnapshotE(); - } - }; - } - - // --- writeObject ------------------------------------------------------- - // Write a NBHM to a stream - private void writeObject(java.io.ObjectOutputStream s) throws IOException { - s.defaultWriteObject(); // Nothing to write - for (Object K : keySet()) { - final Object V = get(K); // Do an official 'get' - s.writeObject(K); // Write the pair - s.writeObject(V); - } - s.writeObject(null); // Sentinel to indicate end-of-data - s.writeObject(null); - } - - // --- readObject -------------------------------------------------------- - // Read a CHM from a stream - private void readObject(java.io.ObjectInputStream s) throws IOException, ClassNotFoundException { - s.defaultReadObject(); // Read nothing - initialize(MIN_SIZE); - for (;;) { - final TypeK K = (TypeK) s.readObject(); - final TypeV V = (TypeV) s.readObject(); - if (K == null) break; - put(K, V); // Insert with an offical put - } - } - - protected abstract class AbstractEntry implements Map.Entry { - - /** Strongly typed key */ - protected final TypeK _key; - - /** Strongly typed value */ - protected TypeV _val; - - public AbstractEntry(final TypeK key, final TypeV val) { - _key = key; - _val = val; - } - - public AbstractEntry(final Map.Entry e) { - _key = e.getKey(); - _val = e.getValue(); - } - - /** Return "key=val" string */ - public String toString() { - return _key + "=" + _val; - } - - /** Return key */ - public TypeK getKey() { - return _key; - } - - /** Return val */ - public TypeV getValue() { - return _val; - } - - /** Equal if the underlying key and value are equal */ - public boolean equals(final Object o) { - if (!(o instanceof Map.Entry)) return false; - final Map.Entry e = (Map.Entry) o; - return eq(_key, e.getKey()) && eq(_val, e.getValue()); - } - - /** Compute "key.hashCode() ^ val.hashCode()" */ - public int hashCode() { - return ((_key == null) ? 0 : _key.hashCode()) - ^ ((_val == null) ? 0 : _val.hashCode()); - } - - private boolean eq(final Object o1, final Object o2) { - return (o1 == null ? o2 == null : o1.equals(o2)); - } - } - - protected static class ConcurrentAutoTable implements Serializable { - - // --- public interface --- - /** - * Add the given value to current counter value.Concurrent updates will - * not be lost, but addAndGet or getAndAdd are not implemented because the - * total counter value (i.e., {@link #get}) is not atomically updated. Updates are striped across an array of counters to avoid cache contention - * and has been tested with performance scaling linearly up to 768 CPUs. - * @param x long - */ - public void add(long x) { - add_if(x); - } - - /** {@link #add} with -1 */ - public void decrement() { - add_if(-1L); - } - - /** {@link #add} with +1 */ - public void increment() { - add_if(1L); - } - - /** * Atomically set the sum of the striped counters to specified value.Rather more expensive than a simple store, in order to remain atomic. - * @param x long - */ - public void set(long x) { - CAT newcat = new CAT(null, 4, x); - // Spin until CAS works - while (!CAS_cat(_cat, newcat)) {/* empty */ - } - } - - /** - * Current value of the counter.Since other threads are updating furiously - * the value is only approximate, but it includes all counts made by the - * current thread. Requires a pass over the internally striped counters. - * @return long - */ - public long get() { - return _cat.sum(); - } - - /** Same as {@link #get}, included for completeness. - * @return int */ - public int intValue() { - return (int) _cat.sum(); - } - - /** Same as {@link #get}, included for completeness. - * @return long */ - public long longValue() { - return _cat.sum(); - } - - /** - * A cheaper {@link #get}.Updated only once/millisecond, but as fast as a simple load instruction when not updating. - * @return long - */ - public long estimate_get() { - return _cat.estimate_sum(); - } - - /** - * Return the counter's {@code long} value converted to a string. - * @return String - */ - @Override - public String toString() { - return _cat.toString(); - } - - /** - * A more verbose print than {@link #toString}, showing internal structure. - * Useful for debugging. - */ - public void print() { - _cat.print(); - } - - /** - * Return the internal counter striping factor.Useful for diagnosing performance problems. - * @return int - */ - public int internal_size() { - return _cat._t.length; - } - - // Only add 'x' to some slot in table, hinted at by 'hash'. The sum can - // overflow. Value is CAS'd so no counts are lost. The CAS is retried until - // it succeeds. Returned value is the old value. - private long add_if(long x) { - return _cat.add_if(x, hash(), this); - } - - // The underlying array of concurrently updated long counters - private volatile CAT _cat = new CAT(null, 16/* Start Small, Think Big! */, 0L); - - private static AtomicReferenceFieldUpdater _catUpdater - = AtomicReferenceFieldUpdater.newUpdater(ConcurrentAutoTable.class, CAT.class, "_cat"); - - private boolean CAS_cat(CAT oldcat, CAT newcat) { - return _catUpdater.compareAndSet(this, oldcat, newcat); - } - - // Hash spreader - private static int hash() { - //int h = (int)Thread.currentThread().getId(); - int h = System.identityHashCode(Thread.currentThread()); - return h << 3; // Pad out cache lines. The goal is to avoid cache-line contention - } - - // --- CAT ----------------------------------------------------------------- - private static class CAT implements Serializable { - - // Unsafe crud: get a function which will CAS arrays - private static final int _Lbase = UNSAFE == null ? 0 : UNSAFE.arrayBaseOffset(long[].class); - - private static final int _Lscale = UNSAFE == null ? 0 : UNSAFE.arrayIndexScale(long[].class); - - private static long rawIndex(long[] ary, int i) { - assert i >= 0 && i < ary.length; - return _Lbase + (i * (long) _Lscale); - } - - private static boolean CAS(long[] A, int idx, long old, long nnn) { - return UNSAFE.compareAndSwapLong(A, rawIndex(A, idx), old, nnn); - } - - //volatile long _resizers; // count of threads attempting a resize - //static private final AtomicLongFieldUpdater _resizerUpdater = - // AtomicLongFieldUpdater.newUpdater(CAT.class, "_resizers"); - private final CAT _next; - - private volatile long _fuzzy_sum_cache; - - private volatile long _fuzzy_time; - - private static final int MAX_SPIN = 1; - - private final long[] _t; // Power-of-2 array of longs - - CAT(CAT next, int sz, long init) { - _next = next; - _t = new long[sz]; - _t[0] = init; - } - - // Only add 'x' to some slot in table, hinted at by 'hash'. The sum can - // overflow. Value is CAS'd so no counts are lost. The CAS is attempted - // ONCE. - public long add_if(long x, int hash, ConcurrentAutoTable master) { - final long[] t = _t; - final int idx = hash & (t.length - 1); - // Peel loop; try once fast - long old = t[idx]; - final boolean ok = CAS(t, idx, old, old + x); - if (ok) return old; // Got it - // Try harder - int cnt = 0; - while (true) { - old = t[idx]; - if (CAS(t, idx, old, old + x)) break; // Got it! - cnt++; - } - if (cnt < MAX_SPIN) return old; // Allowable spin loop count - if (t.length >= 1024 * 1024) return old; // too big already - - // Too much contention; double array size in an effort to reduce contention - //long r = _resizers; - //final int newbytes = (t.length<<1)<<3/*word to bytes*/; - //while( !_resizerUpdater.compareAndSet(this,r,r+newbytes) ) - // r = _resizers; - //r += newbytes; - if (master._cat != this) return old; // Already doubled, don't bother - //if( (r>>17) != 0 ) { // Already too much allocation attempts? - // // We could use a wait with timeout, so we'll wakeup as soon as the new - // // table is ready, or after the timeout in any case. Annoyingly, this - // // breaks the non-blocking property - so for now we just briefly sleep. - // try { Thread.sleep(r>>17); } catch( InterruptedException e ) { } - // if( master._cat != this ) return old; - - CAT newcat = new CAT(this, t.length * 2, 0); - // Take 1 stab at updating the CAT with the new larger size. If this - // fails, we assume some other thread already expanded the CAT - so we - // do not need to retry until it succeeds. - while (master._cat == this && !master.CAS_cat(this, newcat)) {/* empty */ - } - return old; - } - - // Return the current sum of all things in the table. Writers can be - // updating the table furiously, so the sum is only locally accurate. - public long sum() { - long sum = _next == null ? 0 : _next.sum(); // Recursively get cached sum - final long[] t = _t; - for (long cnt : t) sum += cnt; - return sum; - } - - // Fast fuzzy version. Used a cached value until it gets old, then re-up - // the cache. - public long estimate_sum() { - // For short tables, just do the work - if (_t.length <= 64) return sum(); - // For bigger tables, periodically freshen a cached value - long millis = System.currentTimeMillis(); - if (_fuzzy_time != millis) { // Time marches on? - _fuzzy_sum_cache = sum(); // Get sum the hard way - _fuzzy_time = millis; // Indicate freshness of cached value - } - return _fuzzy_sum_cache; // Return cached sum - } - - public String toString() { - return Long.toString(sum()); - } - - public void print() { - long[] t = _t; - System.out.print("[" + t[0]); - for (int i = 1; i < t.length; i++) - System.out.print("," + t[i]); - System.out.print("]"); - if (_next != null) _next.print(); - } - } - } -}