Batcher.cs 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. // batching functionality encapsulated into one class.
  2. // -> less complexity
  3. // -> easy to test
  4. //
  5. // IMPORTANT: we use THRESHOLD batching, not MAXED SIZE batching.
  6. // see threshold comments below.
  7. //
  8. // includes timestamp for tick batching.
  9. // -> allows NetworkTransform etc. to use timestamp without including it in
  10. // every single message
  11. using System;
  12. using System.Collections.Generic;
  13. namespace Mirror
  14. {
  15. public class Batcher
  16. {
  17. // batching threshold instead of max size.
  18. // -> small messages are fit into threshold sized batches
  19. // -> messages larger than threshold are single batches
  20. //
  21. // in other words, we fit up to 'threshold' but still allow larger ones
  22. // for two reasons:
  23. // 1.) data races: skipping batching for larger messages would send a
  24. // large spawn message immediately, while others are batched and
  25. // only flushed at the end of the frame
  26. // 2) timestamp batching: if each batch is expected to contain a
  27. // timestamp, then large messages have to be a batch too. otherwise
  28. // they would not contain a timestamp
  29. readonly int threshold;
  30. // TimeStamp header size. each batch has one.
  31. public const int TimestampSize = sizeof(double);
  32. // Message header size. each message has one.
  33. public static int MessageHeaderSize(int messageSize) =>
  34. Compression.VarUIntSize((ulong)messageSize);
  35. // maximum overhead for a single message.
  36. // useful for the outside to calculate max message sizes.
  37. public static int MaxMessageOverhead(int messageSize) =>
  38. TimestampSize + MessageHeaderSize(messageSize);
  39. // full batches ready to be sent.
  40. // DO NOT queue NetworkMessage, it would box.
  41. // DO NOT queue each serialization separately.
  42. // it would allocate too many writers.
  43. // https://github.com/vis2k/Mirror/pull/3127
  44. // => best to build batches on the fly.
  45. readonly Queue<NetworkWriterPooled> batches = new Queue<NetworkWriterPooled>();
  46. // current batch in progress
  47. NetworkWriterPooled batch;
  48. public Batcher(int threshold)
  49. {
  50. this.threshold = threshold;
  51. }
  52. // add a message for batching
  53. // we allow any sized messages.
  54. // caller needs to make sure they are within max packet size.
  55. public void AddMessage(ArraySegment<byte> message, double timeStamp)
  56. {
  57. // predict the needed size, which is varint(size) + content
  58. int headerSize = Compression.VarUIntSize((ulong)message.Count);
  59. int neededSize = headerSize + message.Count;
  60. // when appending to a batch in progress, check final size.
  61. // if it expands beyond threshold, then we should finalize it first.
  62. // => less than or exactly threshold is fine.
  63. // GetBatch() will finalize it.
  64. // => see unit tests.
  65. if (batch != null &&
  66. batch.Position + neededSize > threshold)
  67. {
  68. batches.Enqueue(batch);
  69. batch = null;
  70. }
  71. // initialize a new batch if necessary
  72. if (batch == null)
  73. {
  74. // borrow from pool. we return it in GetBatch.
  75. batch = NetworkWriterPool.Get();
  76. // write timestamp first.
  77. // -> double precision for accuracy over long periods of time
  78. // -> batches are per-frame, it doesn't matter which message's
  79. // timestamp we use.
  80. batch.WriteDouble(timeStamp);
  81. }
  82. // add serialization to current batch. even if > threshold.
  83. // -> we do allow > threshold sized messages as single batch
  84. // -> WriteBytes instead of WriteSegment because the latter
  85. // would add a size header. we want to write directly.
  86. //
  87. // include size prefix as varint!
  88. // -> fixes NetworkMessage serialization mismatch corrupting the
  89. // next message in a batch.
  90. // -> a _lot_ of time was wasted debugging corrupt batches.
  91. // no easy way to figure out which NetworkMessage has a mismatch.
  92. // -> this is worth everyone's sanity.
  93. // -> varint means we prefix with 1 byte most of the time.
  94. // -> the same issue in NetworkIdentity was why Mirror started!
  95. Compression.CompressVarUInt(batch, (ulong)message.Count);
  96. batch.WriteBytes(message.Array, message.Offset, message.Count);
  97. }
  98. // helper function to copy a batch to writer and return it to pool
  99. static void CopyAndReturn(NetworkWriterPooled batch, NetworkWriter writer)
  100. {
  101. // make sure the writer is fresh to avoid uncertain situations
  102. if (writer.Position != 0)
  103. throw new ArgumentException($"GetBatch needs a fresh writer!");
  104. // copy to the target writer
  105. ArraySegment<byte> segment = batch.ToArraySegment();
  106. writer.WriteBytes(segment.Array, segment.Offset, segment.Count);
  107. // return batch to pool for reuse
  108. NetworkWriterPool.Return(batch);
  109. }
  110. // get the next batch which is available for sending (if any).
  111. // TODO safely get & return a batch instead of copying to writer?
  112. // TODO could return pooled writer & use GetBatch in a 'using' statement!
  113. public bool GetBatch(NetworkWriter writer)
  114. {
  115. // get first batch from queue (if any)
  116. if (batches.TryDequeue(out NetworkWriterPooled first))
  117. {
  118. CopyAndReturn(first, writer);
  119. return true;
  120. }
  121. // if queue was empty, we can send the batch in progress.
  122. if (batch != null)
  123. {
  124. CopyAndReturn(batch, writer);
  125. batch = null;
  126. return true;
  127. }
  128. // nothing was written
  129. return false;
  130. }
  131. }
  132. }