MagnificentReceivePipe.cs 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. // a magnificent receive pipe to shield us from all of life's complexities.
  2. // safely sends messages from receive thread to main thread.
  3. // -> thread safety built in
  4. // -> byte[] pooling coming in the future
  5. //
  6. // => hides all the complexity from telepathy
  7. // => easy to switch between stack/queue/concurrentqueue/etc.
  8. // => easy to test
  9. using System;
  10. using System.Collections.Generic;
  11. namespace Telepathy
  12. {
  13. public class MagnificentReceivePipe
  14. {
  15. // queue entry message. only used in here.
  16. // -> byte arrays are always of 4 + MaxMessageSize
  17. // -> ArraySegment indicates the actual message content
  18. struct Entry
  19. {
  20. public int connectionId;
  21. public EventType eventType;
  22. public ArraySegment<byte> data;
  23. public Entry(int connectionId, EventType eventType, ArraySegment<byte> data)
  24. {
  25. this.connectionId = connectionId;
  26. this.eventType = eventType;
  27. this.data = data;
  28. }
  29. }
  30. // message queue
  31. // ConcurrentQueue allocates. lock{} instead.
  32. //
  33. // IMPORTANT: lock{} all usages!
  34. readonly Queue<Entry> queue = new Queue<Entry>();
  35. // byte[] pool to avoid allocations
  36. // Take & Return is beautifully encapsulated in the pipe.
  37. // the outside does not need to worry about anything.
  38. // and it can be tested easily.
  39. //
  40. // IMPORTANT: lock{} all usages!
  41. Pool<byte[]> pool;
  42. // unfortunately having one receive pipe per connetionId is way slower
  43. // in CCU tests. right now we have one pipe for all connections.
  44. // => we still need to limit queued messages per connection to avoid one
  45. // spamming connection being able to slow down everyone else since
  46. // the queue would be full of just this connection's messages forever
  47. // => let's use a simpler per-connectionId counter for now
  48. Dictionary<int, int> queueCounter = new Dictionary<int, int>();
  49. // constructor
  50. public MagnificentReceivePipe(int MaxMessageSize)
  51. {
  52. // initialize pool to create max message sized byte[]s each time
  53. pool = new Pool<byte[]>(() => new byte[MaxMessageSize]);
  54. }
  55. // return amount of queued messages for this connectionId.
  56. // for statistics. don't call Count and assume that it's the same after
  57. // the call.
  58. public int Count(int connectionId)
  59. {
  60. lock (this)
  61. {
  62. return queueCounter.TryGetValue(connectionId, out int count)
  63. ? count
  64. : 0;
  65. }
  66. }
  67. // total count
  68. public int TotalCount
  69. {
  70. get { lock (this) { return queue.Count; } }
  71. }
  72. // pool count for testing
  73. public int PoolCount
  74. {
  75. get { lock (this) { return pool.Count(); } }
  76. }
  77. // enqueue a message
  78. // -> ArraySegment to avoid allocations later
  79. // -> parameters passed directly so it's more obvious that we don't just
  80. // queue a passed 'Message', instead we copy the ArraySegment into
  81. // a byte[] and store it internally, etc.)
  82. public void Enqueue(int connectionId, EventType eventType, ArraySegment<byte> message)
  83. {
  84. // pool & queue usage always needs to be locked
  85. lock (this)
  86. {
  87. // does this message have a data array content?
  88. ArraySegment<byte> segment = default;
  89. if (message != default)
  90. {
  91. // ArraySegment is only valid until returning.
  92. // copy it into a byte[] that we can store.
  93. // ArraySegment array is only valid until returning, so copy
  94. // it into a byte[] that we can queue safely.
  95. // get one from the pool first to avoid allocations
  96. byte[] bytes = pool.Take();
  97. // copy into it
  98. Buffer.BlockCopy(message.Array, message.Offset, bytes, 0, message.Count);
  99. // indicate which part is the message
  100. segment = new ArraySegment<byte>(bytes, 0, message.Count);
  101. }
  102. // enqueue it
  103. // IMPORTANT: pass the segment around pool byte[],
  104. // NOT the 'message' that is only valid until returning!
  105. Entry entry = new Entry(connectionId, eventType, segment);
  106. queue.Enqueue(entry);
  107. // increase counter for this connectionId
  108. int oldCount = Count(connectionId);
  109. queueCounter[connectionId] = oldCount + 1;
  110. }
  111. }
  112. // peek the next message
  113. // -> allows the caller to process it while pipe still holds on to the
  114. // byte[]
  115. // -> TryDequeue should be called after processing, so that the message
  116. // is actually dequeued and the byte[] is returned to pool!
  117. // => see TryDequeue comments!
  118. //
  119. // IMPORTANT: TryPeek & Dequeue need to be called from the SAME THREAD!
  120. public bool TryPeek(out int connectionId, out EventType eventType, out ArraySegment<byte> data)
  121. {
  122. connectionId = 0;
  123. eventType = EventType.Disconnected;
  124. data = default;
  125. // pool & queue usage always needs to be locked
  126. lock (this)
  127. {
  128. if (queue.Count > 0)
  129. {
  130. Entry entry = queue.Peek();
  131. connectionId = entry.connectionId;
  132. eventType = entry.eventType;
  133. data = entry.data;
  134. return true;
  135. }
  136. return false;
  137. }
  138. }
  139. // dequeue the next message
  140. // -> simply dequeues and returns the byte[] to pool (if any)
  141. // -> use Peek to actually process the first element while the pipe
  142. // still holds on to the byte[]
  143. // -> doesn't return the element because the byte[] needs to be returned
  144. // to the pool in dequeue. caller can't be allowed to work with a
  145. // byte[] that is already returned to pool.
  146. // => Peek & Dequeue is the most simple, clean solution for receive
  147. // pipe pooling to avoid allocations!
  148. //
  149. // IMPORTANT: TryPeek & Dequeue need to be called from the SAME THREAD!
  150. public bool TryDequeue()
  151. {
  152. // pool & queue usage always needs to be locked
  153. lock (this)
  154. {
  155. if (queue.Count > 0)
  156. {
  157. // dequeue from queue
  158. Entry entry = queue.Dequeue();
  159. // return byte[] to pool (if any).
  160. // not all message types have byte[] contents.
  161. if (entry.data != default)
  162. {
  163. pool.Return(entry.data.Array);
  164. }
  165. // decrease counter for this connectionId
  166. queueCounter[entry.connectionId]--;
  167. // remove if zero. don't want to keep old connectionIds in
  168. // there forever, it would cause slowly growing memory.
  169. if (queueCounter[entry.connectionId] == 0)
  170. queueCounter.Remove(entry.connectionId);
  171. return true;
  172. }
  173. return false;
  174. }
  175. }
  176. public void Clear()
  177. {
  178. // pool & queue usage always needs to be locked
  179. lock (this)
  180. {
  181. // clear queue, but via dequeue to return each byte[] to pool
  182. while (queue.Count > 0)
  183. {
  184. // dequeue
  185. Entry entry = queue.Dequeue();
  186. // return byte[] to pool (if any).
  187. // not all message types have byte[] contents.
  188. if (entry.data != default)
  189. {
  190. pool.Return(entry.data.Array);
  191. }
  192. }
  193. // clear counter too
  194. queueCounter.Clear();
  195. }
  196. }
  197. }
  198. }