// The following dependency was taken from https://github.com/dave-hillier/disruptor-unity3d // The Apache License 2.0 this dependency follows is located at https://github.com/dave-hillier/disruptor-unity3d/blob/master/LICENSE. // Modifications were made by SoftwareGuy (Coburn). using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Threading; namespace IgnoranceThirdparty { /// /// Implementation of the Disruptor pattern /// /// the type of item to be stored public class RingBuffer { private readonly T[] _entries; private readonly int _modMask; private Volatile.PaddedLong _consumerCursor = new Volatile.PaddedLong(); private Volatile.PaddedLong _producerCursor = new Volatile.PaddedLong(); /// /// Creates a new RingBuffer with the given capacity /// /// The capacity of the buffer /// Only a single thread may attempt to consume at any one time public RingBuffer(int capacity) { capacity = NextPowerOfTwo(capacity); _modMask = capacity - 1; _entries = new T[capacity]; } /// /// The maximum number of items that can be stored /// public int Capacity { get { return _entries.Length; } } public T this[long index] { get { unchecked { return _entries[index & _modMask]; } } set { unchecked { _entries[index & _modMask] = value; } } } /// /// Removes an item from the buffer. /// /// The next available item public T Dequeue() { var next = _consumerCursor.ReadAcquireFence() + 1; while (_producerCursor.ReadAcquireFence() < next) // makes sure we read the data from _entries after we have read the producer cursor { Thread.SpinWait(1); } var result = this[next]; _consumerCursor.WriteReleaseFence(next); // makes sure we read the data from _entries before we update the consumer cursor return result; } /// /// Attempts to remove an items from the queue /// /// the items /// True if successful public bool TryDequeue(out T obj) { var next = _consumerCursor.ReadAcquireFence() + 1; if (_producerCursor.ReadAcquireFence() < next) { obj = default(T); return false; } obj = Dequeue(); return true; } /// /// Add an item to the buffer /// /// public void Enqueue(T item) { var next = _producerCursor.ReadAcquireFence() + 1; long wrapPoint = next - _entries.Length; long min = _consumerCursor.ReadAcquireFence(); while (wrapPoint > min) { min = _consumerCursor.ReadAcquireFence(); Thread.SpinWait(1); } this[next] = item; _producerCursor.WriteReleaseFence(next); // makes sure we write the data in _entries before we update the producer cursor } /// /// The number of items in the buffer /// /// for indicative purposes only, may contain stale data public int Count { get { return (int)(_producerCursor.ReadFullFence() - _consumerCursor.ReadFullFence()); } } private static int NextPowerOfTwo(int x) { var result = 2; while (result < x) { result <<= 1; } return result; } } public static class Volatile { private const int CacheLineSize = 64; [StructLayout(LayoutKind.Explicit, Size = CacheLineSize * 2)] public struct PaddedLong { [FieldOffset(CacheLineSize)] private long _value; /// /// Create a new with the given initial value. /// /// Initial value public PaddedLong(long value) { _value = value; } /// /// Read the value without applying any fence /// /// The current value public long ReadUnfenced() { return _value; } /// /// Read the value applying acquire fence semantic /// /// The current value public long ReadAcquireFence() { var value = _value; Thread.MemoryBarrier(); return value; } /// /// Read the value applying full fence semantic /// /// The current value public long ReadFullFence() { Thread.MemoryBarrier(); return _value; } /// /// Read the value applying a compiler only fence, no CPU fence is applied /// /// The current value [MethodImpl(MethodImplOptions.NoOptimization)] public long ReadCompilerOnlyFence() { return _value; } /// /// Write the value applying release fence semantic /// /// The new value public void WriteReleaseFence(long newValue) { Thread.MemoryBarrier(); _value = newValue; } /// /// Write the value applying full fence semantic /// /// The new value public void WriteFullFence(long newValue) { Thread.MemoryBarrier(); _value = newValue; } /// /// Write the value applying a compiler fence only, no CPU fence is applied /// /// The new value [MethodImpl(MethodImplOptions.NoOptimization)] public void WriteCompilerOnlyFence(long newValue) { _value = newValue; } /// /// Write without applying any fence /// /// The new value public void WriteUnfenced(long newValue) { _value = newValue; } /// /// Atomically set the value to the given updated value if the current value equals the comparand /// /// The new value /// The comparand (expected value) /// public bool AtomicCompareExchange(long newValue, long comparand) { return Interlocked.CompareExchange(ref _value, newValue, comparand) == comparand; } /// /// Atomically set the value to the given updated value /// /// The new value /// The original value public long AtomicExchange(long newValue) { return Interlocked.Exchange(ref _value, newValue); } /// /// Atomically add the given value to the current value and return the sum /// /// The value to be added /// The sum of the current value and the given value public long AtomicAddAndGet(long delta) { return Interlocked.Add(ref _value, delta); } /// /// Atomically increment the current value and return the new value /// /// The incremented value. public long AtomicIncrementAndGet() { return Interlocked.Increment(ref _value); } /// /// Atomically increment the current value and return the new value /// /// The decremented value. public long AtomicDecrementAndGet() { return Interlocked.Decrement(ref _value); } /// /// Returns the string representation of the current value. /// /// the string representation of the current value. public override string ToString() { var value = ReadFullFence(); return value.ToString(); } } } }