RingBuffer.cs 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. // The following dependency was taken from https://github.com/dave-hillier/disruptor-unity3d
  2. // The Apache License 2.0 this dependency follows is located at https://github.com/dave-hillier/disruptor-unity3d/blob/master/LICENSE.
  3. // Modifications were made by SoftwareGuy (Coburn).
  4. using System.Runtime.CompilerServices;
  5. using System.Runtime.InteropServices;
  6. using System.Threading;
  7. namespace IgnoranceThirdparty
  8. {
  9. /// <summary>
  10. /// Implementation of the Disruptor pattern
  11. /// </summary>
  12. /// <typeparam name="T">the type of item to be stored</typeparam>
  13. public class RingBuffer<T>
  14. {
  15. private readonly T[] _entries;
  16. private readonly int _modMask;
  17. private Volatile.PaddedLong _consumerCursor = new Volatile.PaddedLong();
  18. private Volatile.PaddedLong _producerCursor = new Volatile.PaddedLong();
  19. /// <summary>
  20. /// Creates a new RingBuffer with the given capacity
  21. /// </summary>
  22. /// <param name="capacity">The capacity of the buffer</param>
  23. /// <remarks>Only a single thread may attempt to consume at any one time</remarks>
  24. public RingBuffer(int capacity)
  25. {
  26. capacity = NextPowerOfTwo(capacity);
  27. _modMask = capacity - 1;
  28. _entries = new T[capacity];
  29. }
  30. /// <summary>
  31. /// The maximum number of items that can be stored
  32. /// </summary>
  33. public int Capacity
  34. {
  35. get { return _entries.Length; }
  36. }
  37. public T this[long index]
  38. {
  39. get { unchecked { return _entries[index & _modMask]; } }
  40. set { unchecked { _entries[index & _modMask] = value; } }
  41. }
  42. /// <summary>
  43. /// Removes an item from the buffer.
  44. /// </summary>
  45. /// <returns>The next available item</returns>
  46. public T Dequeue()
  47. {
  48. var next = _consumerCursor.ReadAcquireFence() + 1;
  49. while (_producerCursor.ReadAcquireFence() < next) // makes sure we read the data from _entries after we have read the producer cursor
  50. {
  51. Thread.SpinWait(1);
  52. }
  53. var result = this[next];
  54. _consumerCursor.WriteReleaseFence(next); // makes sure we read the data from _entries before we update the consumer cursor
  55. return result;
  56. }
  57. /// <summary>
  58. /// Attempts to remove an items from the queue
  59. /// </summary>
  60. /// <param name="obj">the items</param>
  61. /// <returns>True if successful</returns>
  62. public bool TryDequeue(out T obj)
  63. {
  64. var next = _consumerCursor.ReadAcquireFence() + 1;
  65. if (_producerCursor.ReadAcquireFence() < next)
  66. {
  67. obj = default(T);
  68. return false;
  69. }
  70. obj = Dequeue();
  71. return true;
  72. }
  73. /// <summary>
  74. /// Add an item to the buffer
  75. /// </summary>
  76. /// <param name="item"></param>
  77. public void Enqueue(T item)
  78. {
  79. var next = _producerCursor.ReadAcquireFence() + 1;
  80. long wrapPoint = next - _entries.Length;
  81. long min = _consumerCursor.ReadAcquireFence();
  82. while (wrapPoint > min)
  83. {
  84. min = _consumerCursor.ReadAcquireFence();
  85. Thread.SpinWait(1);
  86. }
  87. this[next] = item;
  88. _producerCursor.WriteReleaseFence(next); // makes sure we write the data in _entries before we update the producer cursor
  89. }
  90. /// <summary>
  91. /// The number of items in the buffer
  92. /// </summary>
  93. /// <remarks>for indicative purposes only, may contain stale data</remarks>
  94. public int Count { get { return (int)(_producerCursor.ReadFullFence() - _consumerCursor.ReadFullFence()); } }
  95. private static int NextPowerOfTwo(int x)
  96. {
  97. var result = 2;
  98. while (result < x)
  99. {
  100. result <<= 1;
  101. }
  102. return result;
  103. }
  104. }
  105. public static class Volatile
  106. {
  107. private const int CacheLineSize = 64;
  108. [StructLayout(LayoutKind.Explicit, Size = CacheLineSize * 2)]
  109. public struct PaddedLong
  110. {
  111. [FieldOffset(CacheLineSize)]
  112. private long _value;
  113. /// <summary>
  114. /// Create a new <see cref="PaddedLong"/> with the given initial value.
  115. /// </summary>
  116. /// <param name="value">Initial value</param>
  117. public PaddedLong(long value)
  118. {
  119. _value = value;
  120. }
  121. /// <summary>
  122. /// Read the value without applying any fence
  123. /// </summary>
  124. /// <returns>The current value</returns>
  125. public long ReadUnfenced()
  126. {
  127. return _value;
  128. }
  129. /// <summary>
  130. /// Read the value applying acquire fence semantic
  131. /// </summary>
  132. /// <returns>The current value</returns>
  133. public long ReadAcquireFence()
  134. {
  135. var value = _value;
  136. Thread.MemoryBarrier();
  137. return value;
  138. }
  139. /// <summary>
  140. /// Read the value applying full fence semantic
  141. /// </summary>
  142. /// <returns>The current value</returns>
  143. public long ReadFullFence()
  144. {
  145. Thread.MemoryBarrier();
  146. return _value;
  147. }
  148. /// <summary>
  149. /// Read the value applying a compiler only fence, no CPU fence is applied
  150. /// </summary>
  151. /// <returns>The current value</returns>
  152. [MethodImpl(MethodImplOptions.NoOptimization)]
  153. public long ReadCompilerOnlyFence()
  154. {
  155. return _value;
  156. }
  157. /// <summary>
  158. /// Write the value applying release fence semantic
  159. /// </summary>
  160. /// <param name="newValue">The new value</param>
  161. public void WriteReleaseFence(long newValue)
  162. {
  163. Thread.MemoryBarrier();
  164. _value = newValue;
  165. }
  166. /// <summary>
  167. /// Write the value applying full fence semantic
  168. /// </summary>
  169. /// <param name="newValue">The new value</param>
  170. public void WriteFullFence(long newValue)
  171. {
  172. Thread.MemoryBarrier();
  173. _value = newValue;
  174. }
  175. /// <summary>
  176. /// Write the value applying a compiler fence only, no CPU fence is applied
  177. /// </summary>
  178. /// <param name="newValue">The new value</param>
  179. [MethodImpl(MethodImplOptions.NoOptimization)]
  180. public void WriteCompilerOnlyFence(long newValue)
  181. {
  182. _value = newValue;
  183. }
  184. /// <summary>
  185. /// Write without applying any fence
  186. /// </summary>
  187. /// <param name="newValue">The new value</param>
  188. public void WriteUnfenced(long newValue)
  189. {
  190. _value = newValue;
  191. }
  192. /// <summary>
  193. /// Atomically set the value to the given updated value if the current value equals the comparand
  194. /// </summary>
  195. /// <param name="newValue">The new value</param>
  196. /// <param name="comparand">The comparand (expected value)</param>
  197. /// <returns></returns>
  198. public bool AtomicCompareExchange(long newValue, long comparand)
  199. {
  200. return Interlocked.CompareExchange(ref _value, newValue, comparand) == comparand;
  201. }
  202. /// <summary>
  203. /// Atomically set the value to the given updated value
  204. /// </summary>
  205. /// <param name="newValue">The new value</param>
  206. /// <returns>The original value</returns>
  207. public long AtomicExchange(long newValue)
  208. {
  209. return Interlocked.Exchange(ref _value, newValue);
  210. }
  211. /// <summary>
  212. /// Atomically add the given value to the current value and return the sum
  213. /// </summary>
  214. /// <param name="delta">The value to be added</param>
  215. /// <returns>The sum of the current value and the given value</returns>
  216. public long AtomicAddAndGet(long delta)
  217. {
  218. return Interlocked.Add(ref _value, delta);
  219. }
  220. /// <summary>
  221. /// Atomically increment the current value and return the new value
  222. /// </summary>
  223. /// <returns>The incremented value.</returns>
  224. public long AtomicIncrementAndGet()
  225. {
  226. return Interlocked.Increment(ref _value);
  227. }
  228. /// <summary>
  229. /// Atomically increment the current value and return the new value
  230. /// </summary>
  231. /// <returns>The decremented value.</returns>
  232. public long AtomicDecrementAndGet()
  233. {
  234. return Interlocked.Decrement(ref _value);
  235. }
  236. /// <summary>
  237. /// Returns the string representation of the current value.
  238. /// </summary>
  239. /// <returns>the string representation of the current value.</returns>
  240. public override string ToString()
  241. {
  242. var value = ReadFullFence();
  243. return value.ToString();
  244. }
  245. }
  246. }
  247. }