ReceiveLoop.cs 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.IO;
  4. using System.Net.Sockets;
  5. using System.Text;
  6. using System.Threading;
  7. using UnityEngine.Profiling;
  8. namespace Mirror.SimpleWeb
  9. {
  10. internal static class ReceiveLoop
  11. {
  12. public struct Config
  13. {
  14. public readonly Connection conn;
  15. public readonly int maxMessageSize;
  16. public readonly bool expectMask;
  17. public readonly ConcurrentQueue<Message> queue;
  18. public readonly BufferPool bufferPool;
  19. public Config(Connection conn, int maxMessageSize, bool expectMask, ConcurrentQueue<Message> queue, BufferPool bufferPool)
  20. {
  21. this.conn = conn ?? throw new ArgumentNullException(nameof(conn));
  22. this.maxMessageSize = maxMessageSize;
  23. this.expectMask = expectMask;
  24. this.queue = queue ?? throw new ArgumentNullException(nameof(queue));
  25. this.bufferPool = bufferPool ?? throw new ArgumentNullException(nameof(bufferPool));
  26. }
  27. public void Deconstruct(out Connection conn, out int maxMessageSize, out bool expectMask, out ConcurrentQueue<Message> queue, out BufferPool bufferPool)
  28. {
  29. conn = this.conn;
  30. maxMessageSize = this.maxMessageSize;
  31. expectMask = this.expectMask;
  32. queue = this.queue;
  33. bufferPool = this.bufferPool;
  34. }
  35. }
  36. public static void Loop(Config config)
  37. {
  38. (Connection conn, int maxMessageSize, bool expectMask, ConcurrentQueue<Message> queue, BufferPool _) = config;
  39. Profiler.BeginThreadProfiling("SimpleWeb", $"ReceiveLoop {conn.connId}");
  40. byte[] readBuffer = new byte[Constants.HeaderSize + (expectMask ? Constants.MaskSize : 0) + maxMessageSize];
  41. try
  42. {
  43. try
  44. {
  45. TcpClient client = conn.client;
  46. while (client.Connected)
  47. {
  48. ReadOneMessage(config, readBuffer);
  49. }
  50. Log.Info($"{conn} Not Connected");
  51. }
  52. catch (Exception)
  53. {
  54. // if interrupted we don't care about other exceptions
  55. Utils.CheckForInterupt();
  56. throw;
  57. }
  58. }
  59. catch (ThreadInterruptedException e) { Log.InfoException(e); }
  60. catch (ThreadAbortException e) { Log.InfoException(e); }
  61. catch (ObjectDisposedException e) { Log.InfoException(e); }
  62. catch (ReadHelperException e)
  63. {
  64. // log as info only
  65. Log.InfoException(e);
  66. }
  67. catch (SocketException e)
  68. {
  69. // this could happen if wss client closes stream
  70. Log.Warn($"ReceiveLoop SocketException\n{e.Message}", false);
  71. queue.Enqueue(new Message(conn.connId, e));
  72. }
  73. catch (IOException e)
  74. {
  75. // this could happen if client disconnects
  76. Log.Warn($"ReceiveLoop IOException\n{e.Message}", false);
  77. queue.Enqueue(new Message(conn.connId, e));
  78. }
  79. catch (InvalidDataException e)
  80. {
  81. Log.Error($"Invalid data from {conn}: {e.Message}");
  82. queue.Enqueue(new Message(conn.connId, e));
  83. }
  84. catch (Exception e)
  85. {
  86. Log.Exception(e);
  87. queue.Enqueue(new Message(conn.connId, e));
  88. }
  89. finally
  90. {
  91. Profiler.EndThreadProfiling();
  92. conn.Dispose();
  93. }
  94. }
  95. static void ReadOneMessage(Config config, byte[] buffer)
  96. {
  97. (Connection conn, int maxMessageSize, bool expectMask, ConcurrentQueue<Message> queue, BufferPool bufferPool) = config;
  98. Stream stream = conn.stream;
  99. int offset = 0;
  100. // read 2
  101. offset = ReadHelper.Read(stream, buffer, offset, Constants.HeaderMinSize);
  102. // log after first blocking call
  103. Log.Verbose($"Message From {conn}");
  104. if (MessageProcessor.NeedToReadShortLength(buffer))
  105. {
  106. offset = ReadHelper.Read(stream, buffer, offset, Constants.ShortLength);
  107. }
  108. MessageProcessor.ValidateHeader(buffer, maxMessageSize, expectMask);
  109. if (expectMask)
  110. {
  111. offset = ReadHelper.Read(stream, buffer, offset, Constants.MaskSize);
  112. }
  113. int opcode = MessageProcessor.GetOpcode(buffer);
  114. int payloadLength = MessageProcessor.GetPayloadLength(buffer);
  115. Log.Verbose($"Header ln:{payloadLength} op:{opcode} mask:{expectMask}");
  116. Log.DumpBuffer($"Raw Header", buffer, 0, offset);
  117. int msgOffset = offset;
  118. offset = ReadHelper.Read(stream, buffer, offset, payloadLength);
  119. switch (opcode)
  120. {
  121. case 2:
  122. HandleArrayMessage(config, buffer, msgOffset, payloadLength);
  123. break;
  124. case 8:
  125. HandleCloseMessage(config, buffer, msgOffset, payloadLength);
  126. break;
  127. }
  128. }
  129. static void HandleArrayMessage(Config config, byte[] buffer, int msgOffset, int payloadLength)
  130. {
  131. (Connection conn, int _, bool expectMask, ConcurrentQueue<Message> queue, BufferPool bufferPool) = config;
  132. ArrayBuffer arrayBuffer = bufferPool.Take(payloadLength);
  133. if (expectMask)
  134. {
  135. int maskOffset = msgOffset - Constants.MaskSize;
  136. // write the result of toggle directly into arrayBuffer to avoid 2nd copy call
  137. MessageProcessor.ToggleMask(buffer, msgOffset, arrayBuffer, payloadLength, buffer, maskOffset);
  138. }
  139. else
  140. {
  141. arrayBuffer.CopyFrom(buffer, msgOffset, payloadLength);
  142. }
  143. // dump after mask off
  144. Log.DumpBuffer($"Message", arrayBuffer);
  145. queue.Enqueue(new Message(conn.connId, arrayBuffer));
  146. }
  147. static void HandleCloseMessage(Config config, byte[] buffer, int msgOffset, int payloadLength)
  148. {
  149. (Connection conn, int _, bool expectMask, ConcurrentQueue<Message> _, BufferPool _) = config;
  150. if (expectMask)
  151. {
  152. int maskOffset = msgOffset - Constants.MaskSize;
  153. MessageProcessor.ToggleMask(buffer, msgOffset, payloadLength, buffer, maskOffset);
  154. }
  155. // dump after mask off
  156. Log.DumpBuffer($"Message", buffer, msgOffset, payloadLength);
  157. Log.Info($"Close: {GetCloseCode(buffer, msgOffset)} message:{GetCloseMessage(buffer, msgOffset, payloadLength)}");
  158. conn.Dispose();
  159. }
  160. static string GetCloseMessage(byte[] buffer, int msgOffset, int payloadLength)
  161. {
  162. return Encoding.UTF8.GetString(buffer, msgOffset + 2, payloadLength - 2);
  163. }
  164. static int GetCloseCode(byte[] buffer, int msgOffset)
  165. {
  166. return buffer[msgOffset + 0] << 8 | buffer[msgOffset + 1];
  167. }
  168. }
  169. }