MagnificentSendPipe.cs 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. // a magnificent send pipe to shield us from all of life's complexities.
  2. // safely sends messages from main thread to send thread.
  3. // -> thread safety built in
  4. // -> byte[] pooling coming in the future
  5. //
  6. // => hides all the complexity from telepathy
  7. // => easy to switch between stack/queue/concurrentqueue/etc.
  8. // => easy to test
  9. using System;
  10. using System.Collections.Generic;
  11. namespace Telepathy
  12. {
  13. public class MagnificentSendPipe
  14. {
  15. // message queue
  16. // ConcurrentQueue allocates. lock{} instead.
  17. // -> byte arrays are always of MaxMessageSize
  18. // -> ArraySegment indicates the actual message content
  19. //
  20. // IMPORTANT: lock{} all usages!
  21. readonly Queue<ArraySegment<byte>> queue = new Queue<ArraySegment<byte>>();
  22. // byte[] pool to avoid allocations
  23. // Take & Return is beautifully encapsulated in the pipe.
  24. // the outside does not need to worry about anything.
  25. // and it can be tested easily.
  26. //
  27. // IMPORTANT: lock{} all usages!
  28. Pool<byte[]> pool;
  29. // constructor
  30. public MagnificentSendPipe(int MaxMessageSize)
  31. {
  32. // initialize pool to create max message sized byte[]s each time
  33. pool = new Pool<byte[]>(() => new byte[MaxMessageSize]);
  34. }
  35. // for statistics. don't call Count and assume that it's the same after
  36. // the call.
  37. public int Count
  38. {
  39. get { lock (this) { return queue.Count; } }
  40. }
  41. // pool count for testing
  42. public int PoolCount
  43. {
  44. get { lock (this) { return pool.Count(); } }
  45. }
  46. // enqueue a message
  47. // arraysegment for allocation free sends later.
  48. // -> the segment's array is only used until Enqueue() returns!
  49. public void Enqueue(ArraySegment<byte> message)
  50. {
  51. // pool & queue usage always needs to be locked
  52. lock (this)
  53. {
  54. // ArraySegment array is only valid until returning, so copy
  55. // it into a byte[] that we can queue safely.
  56. // get one from the pool first to avoid allocations
  57. byte[] bytes = pool.Take();
  58. // copy into it
  59. Buffer.BlockCopy(message.Array, message.Offset, bytes, 0, message.Count);
  60. // indicate which part is the message
  61. ArraySegment<byte> segment = new ArraySegment<byte>(bytes, 0, message.Count);
  62. // now enqueue it
  63. queue.Enqueue(segment);
  64. }
  65. }
  66. // send threads need to dequeue each byte[] and write it into the socket
  67. // -> dequeueing one byte[] after another works, but it's WAY slower
  68. // than dequeueing all immediately (locks only once)
  69. // lock{} & DequeueAll is WAY faster than ConcurrentQueue & dequeue
  70. // one after another:
  71. //
  72. // uMMORPG 450 CCU
  73. // SafeQueue: 900-1440ms latency
  74. // ConcurrentQueue: 2000ms latency
  75. //
  76. // -> the most obvious solution is to just return a list with all byte[]
  77. // (which allocates) and then write each one into the socket
  78. // -> a faster solution is to serialize each one into one payload buffer
  79. // and pass that to the socket only once. fewer socket calls always
  80. // give WAY better CPU performance(!)
  81. // -> to avoid allocating a new list of entries each time, we simply
  82. // serialize all entries into the payload here already
  83. // => having all this complexity built into the pipe makes testing and
  84. // modifying the algorithm super easy!
  85. //
  86. // IMPORTANT: serializing in here will allow us to return the byte[]
  87. // entries back to a pool later to completely avoid
  88. // allocations!
  89. public bool DequeueAndSerializeAll(ref byte[] payload, out int packetSize)
  90. {
  91. // pool & queue usage always needs to be locked
  92. lock (this)
  93. {
  94. // do nothing if empty
  95. packetSize = 0;
  96. if (queue.Count == 0)
  97. return false;
  98. // we might have multiple pending messages. merge into one
  99. // packet to avoid TCP overheads and improve performance.
  100. //
  101. // IMPORTANT: Mirror & DOTSNET already batch into MaxMessageSize
  102. // chunks, but we STILL pack all pending messages
  103. // into one large payload so we only give it to TCP
  104. // ONCE. This is HUGE for performance so we keep it!
  105. packetSize = 0;
  106. foreach (ArraySegment<byte> message in queue)
  107. packetSize += 4 + message.Count; // header + content
  108. // create payload buffer if not created yet or previous one is
  109. // too small
  110. // IMPORTANT: payload.Length might be > packetSize! don't use it!
  111. if (payload == null || payload.Length < packetSize)
  112. payload = new byte[packetSize];
  113. // dequeue all byte[] messages and serialize into the packet
  114. int position = 0;
  115. while (queue.Count > 0)
  116. {
  117. // dequeue
  118. ArraySegment<byte> message = queue.Dequeue();
  119. // write header (size) into buffer at position
  120. Utils.IntToBytesBigEndianNonAlloc(message.Count, payload, position);
  121. position += 4;
  122. // copy message into payload at position
  123. Buffer.BlockCopy(message.Array, message.Offset, payload, position, message.Count);
  124. position += message.Count;
  125. // return to pool so it can be reused (avoids allocations!)
  126. pool.Return(message.Array);
  127. }
  128. // we did serialize something
  129. return true;
  130. }
  131. }
  132. public void Clear()
  133. {
  134. // pool & queue usage always needs to be locked
  135. lock (this)
  136. {
  137. // clear queue, but via dequeue to return each byte[] to pool
  138. while (queue.Count > 0)
  139. {
  140. pool.Return(queue.Dequeue().Array);
  141. }
  142. }
  143. }
  144. }
  145. }