ReceiveLoop.cs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.IO;
  5. using System.Net.Sockets;
  6. using System.Text;
  7. using System.Threading;
  8. using UnityEngine.Profiling;
  9. namespace Mirror.SimpleWeb
  10. {
  11. internal static class ReceiveLoop
  12. {
  13. public struct Config
  14. {
  15. public readonly Connection conn;
  16. public readonly int maxMessageSize;
  17. public readonly bool expectMask;
  18. public readonly ConcurrentQueue<Message> queue;
  19. public readonly BufferPool bufferPool;
  20. public Config(Connection conn, int maxMessageSize, bool expectMask, ConcurrentQueue<Message> queue, BufferPool bufferPool)
  21. {
  22. this.conn = conn ?? throw new ArgumentNullException(nameof(conn));
  23. this.maxMessageSize = maxMessageSize;
  24. this.expectMask = expectMask;
  25. this.queue = queue ?? throw new ArgumentNullException(nameof(queue));
  26. this.bufferPool = bufferPool ?? throw new ArgumentNullException(nameof(bufferPool));
  27. }
  28. public void Deconstruct(out Connection conn, out int maxMessageSize, out bool expectMask, out ConcurrentQueue<Message> queue, out BufferPool bufferPool)
  29. {
  30. conn = this.conn;
  31. maxMessageSize = this.maxMessageSize;
  32. expectMask = this.expectMask;
  33. queue = this.queue;
  34. bufferPool = this.bufferPool;
  35. }
  36. }
  37. struct Header
  38. {
  39. public int payloadLength;
  40. public int offset;
  41. public int opcode;
  42. public bool finished;
  43. }
  44. public static void Loop(Config config)
  45. {
  46. (Connection conn, int maxMessageSize, bool expectMask, ConcurrentQueue<Message> queue, BufferPool _) = config;
  47. Profiler.BeginThreadProfiling("SimpleWeb", $"ReceiveLoop {conn.connId}");
  48. byte[] readBuffer = new byte[Constants.HeaderSize + (expectMask ? Constants.MaskSize : 0) + maxMessageSize];
  49. try
  50. {
  51. try
  52. {
  53. TcpClient client = conn.client;
  54. while (client.Connected)
  55. {
  56. ReadOneMessage(config, readBuffer);
  57. }
  58. Log.Info($"{conn} Not Connected");
  59. }
  60. catch (Exception)
  61. {
  62. // if interrupted we don't care about other exceptions
  63. Utils.CheckForInterupt();
  64. throw;
  65. }
  66. }
  67. catch (ThreadInterruptedException e) { Log.InfoException(e); }
  68. catch (ThreadAbortException e) { Log.InfoException(e); }
  69. catch (ObjectDisposedException e) { Log.InfoException(e); }
  70. catch (ReadHelperException e)
  71. {
  72. Log.InfoException(e);
  73. }
  74. catch (SocketException e)
  75. {
  76. // this could happen if wss client closes stream
  77. Log.Warn($"ReceiveLoop SocketException\n{e.Message}", false);
  78. queue.Enqueue(new Message(conn.connId, e));
  79. }
  80. catch (IOException e)
  81. {
  82. // this could happen if client disconnects
  83. Log.Warn($"ReceiveLoop IOException\n{e.Message}", false);
  84. queue.Enqueue(new Message(conn.connId, e));
  85. }
  86. catch (InvalidDataException e)
  87. {
  88. Log.Error($"Invalid data from {conn}: {e.Message}");
  89. queue.Enqueue(new Message(conn.connId, e));
  90. }
  91. catch (Exception e)
  92. {
  93. Log.Exception(e);
  94. queue.Enqueue(new Message(conn.connId, e));
  95. }
  96. finally
  97. {
  98. Profiler.EndThreadProfiling();
  99. conn.Dispose();
  100. }
  101. }
  102. static void ReadOneMessage(Config config, byte[] buffer)
  103. {
  104. (Connection conn, int maxMessageSize, bool expectMask, ConcurrentQueue<Message> queue, BufferPool bufferPool) = config;
  105. Stream stream = conn.stream;
  106. Header header = ReadHeader(config, buffer);
  107. int msgOffset = header.offset;
  108. header.offset = ReadHelper.Read(stream, buffer, header.offset, header.payloadLength);
  109. if (header.finished)
  110. {
  111. switch (header.opcode)
  112. {
  113. case 2:
  114. HandleArrayMessage(config, buffer, msgOffset, header.payloadLength);
  115. break;
  116. case 8:
  117. HandleCloseMessage(config, buffer, msgOffset, header.payloadLength);
  118. break;
  119. }
  120. }
  121. else
  122. {
  123. // todo cache this to avoid allocations
  124. Queue<ArrayBuffer> fragments = new Queue<ArrayBuffer>();
  125. fragments.Enqueue(CopyMessageToBuffer(bufferPool, expectMask, buffer, msgOffset, header.payloadLength));
  126. int totalSize = header.payloadLength;
  127. while (!header.finished)
  128. {
  129. header = ReadHeader(config, buffer, opCodeContinuation: true);
  130. msgOffset = header.offset;
  131. header.offset = ReadHelper.Read(stream, buffer, header.offset, header.payloadLength);
  132. fragments.Enqueue(CopyMessageToBuffer(bufferPool, expectMask, buffer, msgOffset, header.payloadLength));
  133. totalSize += header.payloadLength;
  134. MessageProcessor.ThrowIfMsgLengthTooLong(totalSize, maxMessageSize);
  135. }
  136. ArrayBuffer msg = bufferPool.Take(totalSize);
  137. msg.count = 0;
  138. while (fragments.Count > 0)
  139. {
  140. ArrayBuffer part = fragments.Dequeue();
  141. part.CopyTo(msg.array, msg.count);
  142. msg.count += part.count;
  143. part.Release();
  144. }
  145. // dump after mask off
  146. Log.DumpBuffer($"Message", msg);
  147. queue.Enqueue(new Message(conn.connId, msg));
  148. }
  149. }
  150. static Header ReadHeader(Config config, byte[] buffer, bool opCodeContinuation = false)
  151. {
  152. (Connection conn, int maxMessageSize, bool expectMask, ConcurrentQueue<Message> queue, BufferPool bufferPool) = config;
  153. Stream stream = conn.stream;
  154. Header header = new Header();
  155. // read 2
  156. header.offset = ReadHelper.Read(stream, buffer, header.offset, Constants.HeaderMinSize);
  157. // log after first blocking call
  158. Log.Verbose($"Message From {conn}");
  159. if (MessageProcessor.NeedToReadShortLength(buffer))
  160. {
  161. header.offset = ReadHelper.Read(stream, buffer, header.offset, Constants.ShortLength);
  162. }
  163. if (MessageProcessor.NeedToReadLongLength(buffer))
  164. {
  165. header.offset = ReadHelper.Read(stream, buffer, header.offset, Constants.LongLength);
  166. }
  167. Log.DumpBuffer($"Raw Header", buffer, 0, header.offset);
  168. MessageProcessor.ValidateHeader(buffer, maxMessageSize, expectMask, opCodeContinuation);
  169. if (expectMask)
  170. {
  171. header.offset = ReadHelper.Read(stream, buffer, header.offset, Constants.MaskSize);
  172. }
  173. header.opcode = MessageProcessor.GetOpcode(buffer);
  174. header.payloadLength = MessageProcessor.GetPayloadLength(buffer);
  175. header.finished = MessageProcessor.Finished(buffer);
  176. Log.Verbose($"Header ln:{header.payloadLength} op:{header.opcode} mask:{expectMask}");
  177. return header;
  178. }
  179. static void HandleArrayMessage(Config config, byte[] buffer, int msgOffset, int payloadLength)
  180. {
  181. (Connection conn, int _, bool expectMask, ConcurrentQueue<Message> queue, BufferPool bufferPool) = config;
  182. ArrayBuffer arrayBuffer = CopyMessageToBuffer(bufferPool, expectMask, buffer, msgOffset, payloadLength);
  183. // dump after mask off
  184. Log.DumpBuffer($"Message", arrayBuffer);
  185. queue.Enqueue(new Message(conn.connId, arrayBuffer));
  186. }
  187. static ArrayBuffer CopyMessageToBuffer(BufferPool bufferPool, bool expectMask, byte[] buffer, int msgOffset, int payloadLength)
  188. {
  189. ArrayBuffer arrayBuffer = bufferPool.Take(payloadLength);
  190. if (expectMask)
  191. {
  192. int maskOffset = msgOffset - Constants.MaskSize;
  193. // write the result of toggle directly into arrayBuffer to avoid 2nd copy call
  194. MessageProcessor.ToggleMask(buffer, msgOffset, arrayBuffer, payloadLength, buffer, maskOffset);
  195. }
  196. else
  197. {
  198. arrayBuffer.CopyFrom(buffer, msgOffset, payloadLength);
  199. }
  200. return arrayBuffer;
  201. }
  202. static void HandleCloseMessage(Config config, byte[] buffer, int msgOffset, int payloadLength)
  203. {
  204. (Connection conn, int _, bool expectMask, ConcurrentQueue<Message> _, BufferPool _) = config;
  205. if (expectMask)
  206. {
  207. int maskOffset = msgOffset - Constants.MaskSize;
  208. MessageProcessor.ToggleMask(buffer, msgOffset, payloadLength, buffer, maskOffset);
  209. }
  210. // dump after mask off
  211. Log.DumpBuffer($"Message", buffer, msgOffset, payloadLength);
  212. Log.Info($"Close: {GetCloseCode(buffer, msgOffset)} message:{GetCloseMessage(buffer, msgOffset, payloadLength)}");
  213. conn.Dispose();
  214. }
  215. static string GetCloseMessage(byte[] buffer, int msgOffset, int payloadLength)
  216. {
  217. return Encoding.UTF8.GetString(buffer, msgOffset + 2, payloadLength - 2);
  218. }
  219. static int GetCloseCode(byte[] buffer, int msgOffset)
  220. {
  221. return buffer[msgOffset + 0] << 8 | buffer[msgOffset + 1];
  222. }
  223. }
  224. }