Client.cs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. using System;
  2. using System.Net.Sockets;
  3. using System.Threading;
  4. namespace Telepathy
  5. {
  6. // ClientState OBJECT that can be handed to the ReceiveThread safely.
  7. // => allows us to create a NEW OBJECT every time we connect and start a
  8. // receive thread.
  9. // => perfectly protects us against data races. fixes all the flaky tests
  10. // where .Connecting or .client would still be used by a dieing thread
  11. // while attempting to use it for a new connection attempt etc.
  12. // => creating a fresh client state each time is the best solution against
  13. // data races here!
  14. class ClientConnectionState : ConnectionState
  15. {
  16. public Thread receiveThread;
  17. // TcpClient.Connected doesn't check if socket != null, which
  18. // results in NullReferenceExceptions if connection was closed.
  19. // -> let's check it manually instead
  20. public bool Connected => client != null &&
  21. client.Client != null &&
  22. client.Client.Connected;
  23. // TcpClient has no 'connecting' state to check. We need to keep track
  24. // of it manually.
  25. // -> checking 'thread.IsAlive && !Connected' is not enough because the
  26. // thread is alive and connected is false for a short moment after
  27. // disconnecting, so this would cause race conditions.
  28. // -> we use a threadsafe bool wrapper so that ThreadFunction can remain
  29. // static (it needs a common lock)
  30. // => Connecting is true from first Connect() call in here, through the
  31. // thread start, until TcpClient.Connect() returns. Simple and clear.
  32. // => bools are atomic according to
  33. // https://docs.microsoft.com/en-us/dotnet/csharp/language-reference/language-specification/variables
  34. // made volatile so the compiler does not reorder access to it
  35. public volatile bool Connecting;
  36. // thread safe pipe for received messages
  37. // => inside client connection state so that we can create a new state
  38. // each time we connect
  39. // (unlike server which has one receive pipe for all connections)
  40. public readonly MagnificentReceivePipe receivePipe;
  41. // constructor always creates new TcpClient for client connection!
  42. public ClientConnectionState(int MaxMessageSize) : base(new TcpClient(), MaxMessageSize)
  43. {
  44. // create receive pipe with max message size for pooling
  45. receivePipe = new MagnificentReceivePipe(MaxMessageSize);
  46. }
  47. // dispose all the state safely
  48. public void Dispose()
  49. {
  50. // close client
  51. client.Close();
  52. // wait until thread finished. this is the only way to guarantee
  53. // that we can call Connect() again immediately after Disconnect
  54. // -> calling .Join would sometimes wait forever, e.g. when
  55. // calling Disconnect while trying to connect to a dead end
  56. receiveThread?.Interrupt();
  57. // we interrupted the receive Thread, so we can't guarantee that
  58. // connecting was reset. let's do it manually.
  59. Connecting = false;
  60. // clear send pipe. no need to hold on to elements.
  61. // (unlike receiveQueue, which is still needed to process the
  62. // latest Disconnected message, etc.)
  63. sendPipe.Clear();
  64. // IMPORTANT: DO NOT CLEAR RECEIVE PIPE.
  65. // we still want to process disconnect messages in Tick()!
  66. // let go of this client completely. the thread ended, no one uses
  67. // it anymore and this way Connected is false again immediately.
  68. client = null;
  69. }
  70. }
  71. public class Client : Common
  72. {
  73. // events to hook into
  74. // => OnData uses ArraySegment for allocation free receives later
  75. public Action OnConnected;
  76. public Action<ArraySegment<byte>> OnData;
  77. public Action OnDisconnected;
  78. // disconnect if send queue gets too big.
  79. // -> avoids ever growing queue memory if network is slower than input
  80. // -> disconnecting is great for load balancing. better to disconnect
  81. // one connection than risking every connection / the whole server
  82. // -> huge queue would introduce multiple seconds of latency anyway
  83. //
  84. // Mirror/DOTSNET use MaxMessageSize batching, so for a 16kb max size:
  85. // limit = 1,000 means 16 MB of memory/connection
  86. // limit = 10,000 means 160 MB of memory/connection
  87. public int SendQueueLimit = 10000;
  88. public int ReceiveQueueLimit = 10000;
  89. // all client state wrapped into an object that is passed to ReceiveThread
  90. // => we create a new one each time we connect to avoid data races with
  91. // old dieing threads still using the previous object!
  92. ClientConnectionState state;
  93. // Connected & Connecting
  94. public bool Connected => state != null && state.Connected;
  95. public bool Connecting => state != null && state.Connecting;
  96. // pipe count, useful for debugging / benchmarks
  97. public int ReceivePipeCount => state != null ? state.receivePipe.TotalCount : 0;
  98. // constructor
  99. public Client(int MaxMessageSize) : base(MaxMessageSize) {}
  100. // the thread function
  101. // STATIC to avoid sharing state!
  102. // => pass ClientState object. a new one is created for each new thread!
  103. // => avoids data races where an old dieing thread might still modify
  104. // the current thread's state :/
  105. static void ReceiveThreadFunction(ClientConnectionState state, string ip, int port, int MaxMessageSize, bool NoDelay, int SendTimeout, int ReceiveTimeout, int ReceiveQueueLimit)
  106. {
  107. Thread sendThread = null;
  108. // absolutely must wrap with try/catch, otherwise thread
  109. // exceptions are silent
  110. try
  111. {
  112. // connect (blocking)
  113. state.client.Connect(ip, port);
  114. state.Connecting = false; // volatile!
  115. // set socket options after the socket was created in Connect()
  116. // (not after the constructor because we clear the socket there)
  117. state.client.NoDelay = NoDelay;
  118. state.client.SendTimeout = SendTimeout;
  119. state.client.ReceiveTimeout = ReceiveTimeout;
  120. // start send thread only after connected
  121. // IMPORTANT: DO NOT SHARE STATE ACROSS MULTIPLE THREADS!
  122. sendThread = new Thread(() => { ThreadFunctions.SendLoop(0, state.client, state.sendPipe, state.sendPending); });
  123. sendThread.IsBackground = true;
  124. sendThread.Start();
  125. // run the receive loop
  126. // (receive pipe is shared across all loops)
  127. ThreadFunctions.ReceiveLoop(0, state.client, MaxMessageSize, state.receivePipe, ReceiveQueueLimit);
  128. }
  129. catch (SocketException exception)
  130. {
  131. // this happens if (for example) the ip address is correct
  132. // but there is no server running on that ip/port
  133. Log.Info("Client Recv: failed to connect to ip=" + ip + " port=" + port + " reason=" + exception);
  134. // add 'Disconnected' event to receive pipe so that the caller
  135. // knows that the Connect failed. otherwise they will never know
  136. state.receivePipe.Enqueue(0, EventType.Disconnected, default);
  137. }
  138. catch (ThreadInterruptedException)
  139. {
  140. // expected if Disconnect() aborts it
  141. }
  142. catch (ThreadAbortException)
  143. {
  144. // expected if Disconnect() aborts it
  145. }
  146. catch (ObjectDisposedException)
  147. {
  148. // expected if Disconnect() aborts it and disposed the client
  149. // while ReceiveThread is in a blocking Connect() call
  150. }
  151. catch (Exception exception)
  152. {
  153. // something went wrong. probably important.
  154. Log.Error("Client Recv Exception: " + exception);
  155. }
  156. // sendthread might be waiting on ManualResetEvent,
  157. // so let's make sure to end it if the connection
  158. // closed.
  159. // otherwise the send thread would only end if it's
  160. // actually sending data while the connection is
  161. // closed.
  162. sendThread?.Interrupt();
  163. // Connect might have failed. thread might have been closed.
  164. // let's reset connecting state no matter what.
  165. state.Connecting = false;
  166. // if we got here then we are done. ReceiveLoop cleans up already,
  167. // but we may never get there if connect fails. so let's clean up
  168. // here too.
  169. state.client?.Close();
  170. }
  171. public void Connect(string ip, int port)
  172. {
  173. // not if already started
  174. if (Connecting || Connected)
  175. {
  176. Log.Warning("Telepathy Client can not create connection because an existing connection is connecting or connected");
  177. return;
  178. }
  179. // overwrite old thread's state object. create a new one to avoid
  180. // data races where an old dieing thread might still modify the
  181. // current state! fixes all the flaky tests!
  182. state = new ClientConnectionState(MaxMessageSize);
  183. // We are connecting from now until Connect succeeds or fails
  184. state.Connecting = true;
  185. // create a TcpClient with perfect IPv4, IPv6 and hostname resolving
  186. // support.
  187. //
  188. // * TcpClient(hostname, port): works but would connect (and block)
  189. // already
  190. // * TcpClient(AddressFamily.InterNetworkV6): takes Ipv4 and IPv6
  191. // addresses but only connects to IPv6 servers (e.g. Telepathy).
  192. // does NOT connect to IPv4 servers (e.g. Mirror Booster), even
  193. // with DualMode enabled.
  194. // * TcpClient(): creates IPv4 socket internally, which would force
  195. // Connect() to only use IPv4 sockets.
  196. //
  197. // => the trick is to clear the internal IPv4 socket so that Connect
  198. // resolves the hostname and creates either an IPv4 or an IPv6
  199. // socket as needed (see TcpClient source)
  200. state.client.Client = null; // clear internal IPv4 socket until Connect()
  201. // client.Connect(ip, port) is blocking. let's call it in the thread
  202. // and return immediately.
  203. // -> this way the application doesn't hang for 30s if connect takes
  204. // too long, which is especially good in games
  205. // -> this way we don't async client.BeginConnect, which seems to
  206. // fail sometimes if we connect too many clients too fast
  207. state.receiveThread = new Thread(() => {
  208. ReceiveThreadFunction(state, ip, port, MaxMessageSize, NoDelay, SendTimeout, ReceiveTimeout, ReceiveQueueLimit);
  209. });
  210. state.receiveThread.IsBackground = true;
  211. state.receiveThread.Start();
  212. }
  213. public void Disconnect()
  214. {
  215. // only if started
  216. if (Connecting || Connected)
  217. {
  218. // dispose all the state safely
  219. state.Dispose();
  220. // IMPORTANT: DO NOT set state = null!
  221. // we still want to process the pipe's disconnect message etc.!
  222. }
  223. }
  224. // send message to server using socket connection.
  225. // arraysegment for allocation free sends later.
  226. // -> the segment's array is only used until Send() returns!
  227. public bool Send(ArraySegment<byte> message)
  228. {
  229. if (Connected)
  230. {
  231. // respect max message size to avoid allocation attacks.
  232. if (message.Count <= MaxMessageSize)
  233. {
  234. // check send pipe limit
  235. if (state.sendPipe.Count < SendQueueLimit)
  236. {
  237. // add to thread safe send pipe and return immediately.
  238. // calling Send here would be blocking (sometimes for long
  239. // times if other side lags or wire was disconnected)
  240. state.sendPipe.Enqueue(message);
  241. state.sendPending.Set(); // interrupt SendThread WaitOne()
  242. return true;
  243. }
  244. // disconnect if send queue gets too big.
  245. // -> avoids ever growing queue memory if network is slower
  246. // than input
  247. // -> avoids ever growing latency as well
  248. //
  249. // note: while SendThread always grabs the WHOLE send queue
  250. // immediately, it's still possible that the sending
  251. // blocks for so long that the send queue just gets
  252. // way too big. have a limit - better safe than sorry.
  253. else
  254. {
  255. // log the reason
  256. Log.Warning($"Client.Send: sendPipe reached limit of {SendQueueLimit}. This can happen if we call send faster than the network can process messages. Disconnecting to avoid ever growing memory & latency.");
  257. // just close it. send thread will take care of the rest.
  258. state.client.Close();
  259. return false;
  260. }
  261. }
  262. Log.Error("Client.Send: message too big: " + message.Count + ". Limit: " + MaxMessageSize);
  263. return false;
  264. }
  265. Log.Warning("Client.Send: not connected!");
  266. return false;
  267. }
  268. // tick: processes up to 'limit' messages
  269. // => limit parameter to avoid deadlocks / too long freezes if server or
  270. // client is too slow to process network load
  271. // => Mirror & DOTSNET need to have a process limit anyway.
  272. // might as well do it here and make life easier.
  273. // => returns amount of remaining messages to process, so the caller
  274. // can call tick again as many times as needed (or up to a limit)
  275. //
  276. // Tick() may process multiple messages, but Mirror needs a way to stop
  277. // processing immediately if a scene change messages arrives. Mirror
  278. // can't process any other messages during a scene change.
  279. // (could be useful for others too)
  280. // => make sure to allocate the lambda only once in transports
  281. public int Tick(int processLimit, Func<bool> checkEnabled = null)
  282. {
  283. // only if state was created yet (after connect())
  284. // note: we don't check 'only if connected' because we want to still
  285. // process Disconnect messages afterwards too!
  286. if (state == null)
  287. return 0;
  288. // process up to 'processLimit' messages
  289. for (int i = 0; i < processLimit; ++i)
  290. {
  291. // check enabled in case a Mirror scene message arrived
  292. if (checkEnabled != null && !checkEnabled())
  293. break;
  294. // peek first. allows us to process the first queued entry while
  295. // still keeping the pooled byte[] alive by not removing anything.
  296. if (state.receivePipe.TryPeek(out int _, out EventType eventType, out ArraySegment<byte> message))
  297. {
  298. switch (eventType)
  299. {
  300. case EventType.Connected:
  301. OnConnected?.Invoke();
  302. break;
  303. case EventType.Data:
  304. OnData?.Invoke(message);
  305. break;
  306. case EventType.Disconnected:
  307. OnDisconnected?.Invoke();
  308. break;
  309. }
  310. // IMPORTANT: now dequeue and return it to pool AFTER we are
  311. // done processing the event.
  312. state.receivePipe.TryDequeue();
  313. }
  314. // no more messages. stop the loop.
  315. else break;
  316. }
  317. // return what's left to process for next time
  318. return state.receivePipe.TotalCount;
  319. }
  320. }
  321. }