SendLoop.cs 6.9 KB


  1. using System;
  2. using System.IO;
  3. using System.Net.Sockets;
  4. using System.Security.Cryptography;
  5. using System.Threading;
  6. using UnityEngine.Profiling;
  7. namespace Mirror.SimpleWeb
  8. {
  9. public static class SendLoopConfig
  10. {
  11. public static volatile bool batchSend = false;
  12. public static volatile bool sleepBeforeSend = false;
  13. }
  14. internal static class SendLoop
  15. {
  16. public struct Config
  17. {
  18. public readonly Connection conn;
  19. public readonly int bufferSize;
  20. public readonly bool setMask;
  21. public Config(Connection conn, int bufferSize, bool setMask)
  22. {
  23. this.conn = conn ?? throw new ArgumentNullException(nameof(conn));
  24. this.bufferSize = bufferSize;
  25. this.setMask = setMask;
  26. }
  27. public void Deconstruct(out Connection conn, out int bufferSize, out bool setMask)
  28. {
  29. conn = this.conn;
  30. bufferSize = this.bufferSize;
  31. setMask = this.setMask;
  32. }
  33. }
  34. public static void Loop(Config config)
  35. {
  36. (Connection conn, int bufferSize, bool setMask) = config;
  37. Profiler.BeginThreadProfiling("SimpleWeb", $"SendLoop {conn.connId}");
  38. // create write buffer for this thread
  39. byte[] writeBuffer = new byte[bufferSize];
  40. MaskHelper maskHelper = setMask ? new MaskHelper() : null;
  41. try
  42. {
  43. TcpClient client = conn.client;
  44. Stream stream = conn.stream;
  45. // null check in case disconnect while send thread is starting
  46. if (client == null)
  47. return;
  48. while (client.Connected)
  49. {
  50. // wait for message
  51. conn.sendPending.Wait();
  52. // wait for 1ms for mirror to send other messages
  53. if (SendLoopConfig.sleepBeforeSend)
  54. {
  55. Thread.Sleep(1);
  56. }
  57. conn.sendPending.Reset();
  58. if (SendLoopConfig.batchSend)
  59. {
  60. int offset = 0;
  61. while (conn.sendQueue.TryDequeue(out ArrayBuffer msg))
  62. {
  63. // check if connected before sending message
  64. if (!client.Connected) { Log.Info($"SendLoop {conn} not connected"); return; }
  65. int maxLength = msg.count + Constants.HeaderSize + Constants.MaskSize;
  66. // if next writer could overflow, write to stream and clear buffer
  67. if (offset + maxLength > bufferSize)
  68. {
  69. stream.Write(writeBuffer, 0, offset);
  70. offset = 0;
  71. }
  72. offset = SendMessage(writeBuffer, offset, msg, setMask, maskHelper);
  73. msg.Release();
  74. }
  75. // after no message in queue, send remaining messages
  76. // don't need to check offset > 0 because last message in queue will always be sent here
  77. stream.Write(writeBuffer, 0, offset);
  78. }
  79. else
  80. {
  81. while (conn.sendQueue.TryDequeue(out ArrayBuffer msg))
  82. {
  83. // check if connected before sending message
  84. if (!client.Connected) { Log.Info($"SendLoop {conn} not connected"); return; }
  85. int length = SendMessage(writeBuffer, 0, msg, setMask, maskHelper);
  86. stream.Write(writeBuffer, 0, length);
  87. msg.Release();
  88. }
  89. }
  90. }
  91. Log.Info($"{conn} Not Connected");
  92. }
  93. catch (ThreadInterruptedException e) { Log.InfoException(e); }
  94. catch (ThreadAbortException e) { Log.InfoException(e); }
  95. catch (Exception e)
  96. {
  97. Log.Exception(e);
  98. }
  99. finally
  100. {
  101. Profiler.EndThreadProfiling();
  102. conn.Dispose();
  103. maskHelper?.Dispose();
  104. }
  105. }
  106. /// <returns>new offset in buffer</returns>
  107. static int SendMessage(byte[] buffer, int startOffset, ArrayBuffer msg, bool setMask, MaskHelper maskHelper)
  108. {
  109. int msgLength = msg.count;
  110. int offset = WriteHeader(buffer, startOffset, msgLength, setMask);
  111. if (setMask)
  112. {
  113. offset = maskHelper.WriteMask(buffer, offset);
  114. }
  115. msg.CopyTo(buffer, offset);
  116. offset += msgLength;
  117. // dump before mask on
  118. Log.DumpBuffer("Send", buffer, startOffset, offset);
  119. if (setMask)
  120. {
  121. int messageOffset = offset - msgLength;
  122. MessageProcessor.ToggleMask(buffer, messageOffset, msgLength, buffer, messageOffset - Constants.MaskSize);
  123. }
  124. return offset;
  125. }
  126. static int WriteHeader(byte[] buffer, int startOffset, int msgLength, bool setMask)
  127. {
  128. int sendLength = 0;
  129. const byte finished = 128;
  130. const byte byteOpCode = 2;
  131. buffer[startOffset + 0] = finished | byteOpCode;
  132. sendLength++;
  133. if (msgLength <= Constants.BytePayloadLength)
  134. {
  135. buffer[startOffset + 1] = (byte)msgLength;
  136. sendLength++;
  137. }
  138. else if (msgLength <= ushort.MaxValue)
  139. {
  140. buffer[startOffset + 1] = 126;
  141. buffer[startOffset + 2] = (byte)(msgLength >> 8);
  142. buffer[startOffset + 3] = (byte)msgLength;
  143. sendLength += 3;
  144. }
  145. else
  146. {
  147. throw new InvalidDataException($"Trying to send a message larger than {ushort.MaxValue} bytes");
  148. }
  149. if (setMask)
  150. {
  151. buffer[startOffset + 1] |= 0b1000_0000;
  152. }
  153. return sendLength + startOffset;
  154. }
  155. sealed class MaskHelper : IDisposable
  156. {
  157. readonly byte[] maskBuffer;
  158. readonly RNGCryptoServiceProvider random;
  159. public MaskHelper()
  160. {
  161. maskBuffer = new byte[4];
  162. random = new RNGCryptoServiceProvider();
  163. }
  164. public void Dispose()
  165. {
  166. random.Dispose();
  167. }
  168. public int WriteMask(byte[] buffer, int offset)
  169. {
  170. random.GetBytes(maskBuffer);
  171. Buffer.BlockCopy(maskBuffer, 0, buffer, offset, 4);
  172. return offset + 4;
  173. }
  174. }
  175. }
  176. }