Server.cs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Net;
  5. using System.Net.Sockets;
  6. using System.Threading;
  7. namespace Telepathy
  8. {
  9. public class Server : Common
  10. {
  11. // events to hook into
  12. // => OnData uses ArraySegment for allocation free receives later
  13. public Action<int> OnConnected;
  14. public Action<int, ArraySegment<byte>> OnData;
  15. public Action<int> OnDisconnected;
  16. // listener
  17. public TcpListener listener;
  18. Thread listenerThread;
  19. // disconnect if send queue gets too big.
  20. // -> avoids ever growing queue memory if network is slower than input
  21. // -> disconnecting is great for load balancing. better to disconnect
  22. // one connection than risking every connection / the whole server
  23. // -> huge queue would introduce multiple seconds of latency anyway
  24. //
  25. // Mirror/DOTSNET use MaxMessageSize batching, so for a 16kb max size:
  26. // limit = 1,000 means 16 MB of memory/connection
  27. // limit = 10,000 means 160 MB of memory/connection
  28. public int SendQueueLimit = 10000;
  29. public int ReceiveQueueLimit = 10000;
  30. // thread safe pipe for received messages
  31. // IMPORTANT: unfortunately using one pipe per connection is way slower
  32. // when testing 150 CCU. we need to use one pipe for all
  33. // connections. this scales beautifully.
  34. protected MagnificentReceivePipe receivePipe;
  35. // pipe count, useful for debugging / benchmarks
  36. public int ReceivePipeTotalCount => receivePipe.TotalCount;
  37. // clients with <connectionId, ConnectionState>
  38. readonly ConcurrentDictionary<int, ConnectionState> clients = new ConcurrentDictionary<int, ConnectionState>();
  39. // connectionId counter
  40. int counter;
  41. // public next id function in case someone needs to reserve an id
  42. // (e.g. if hostMode should always have 0 connection and external
  43. // connections should start at 1, etc.)
  44. public int NextConnectionId()
  45. {
  46. int id = Interlocked.Increment(ref counter);
  47. // it's very unlikely that we reach the uint limit of 2 billion.
  48. // even with 1 new connection per second, this would take 68 years.
  49. // -> but if it happens, then we should throw an exception because
  50. // the caller probably should stop accepting clients.
  51. // -> it's hardly worth using 'bool Next(out id)' for that case
  52. // because it's just so unlikely.
  53. if (id == int.MaxValue)
  54. {
  55. throw new Exception("connection id limit reached: " + id);
  56. }
  57. return id;
  58. }
  59. // check if the server is running
  60. public bool Active => listenerThread != null && listenerThread.IsAlive;
  61. // constructor
  62. public Server(int MaxMessageSize) : base(MaxMessageSize) {}
  63. // the listener thread's listen function
  64. // note: no maxConnections parameter. high level API should handle that.
  65. // (Transport can't send a 'too full' message anyway)
  66. void Listen(int port)
  67. {
  68. // absolutely must wrap with try/catch, otherwise thread
  69. // exceptions are silent
  70. try
  71. {
  72. // start listener on all IPv4 and IPv6 address via .Create
  73. listener = TcpListener.Create(port);
  74. listener.Server.NoDelay = NoDelay;
  75. // IMPORTANT: do not set send/receive timeouts on listener.
  76. // On linux setting the recv timeout will cause the blocking
  77. // Accept call to timeout with EACCEPT (which mono interprets
  78. // as EWOULDBLOCK).
  79. // https://stackoverflow.com/questions/1917814/eagain-error-for-accept-on-blocking-socket/1918118#1918118
  80. // => fixes https://github.com/vis2k/Mirror/issues/2695
  81. //
  82. //listener.Server.SendTimeout = SendTimeout;
  83. //listener.Server.ReceiveTimeout = ReceiveTimeout;
  84. listener.Start();
  85. Log.Info("Server: listening port=" + port);
  86. // keep accepting new clients
  87. while (true)
  88. {
  89. // wait and accept new client
  90. // note: 'using' sucks here because it will try to
  91. // dispose after thread was started but we still need it
  92. // in the thread
  93. TcpClient client = listener.AcceptTcpClient();
  94. // set socket options
  95. client.NoDelay = NoDelay;
  96. client.SendTimeout = SendTimeout;
  97. client.ReceiveTimeout = ReceiveTimeout;
  98. // generate the next connection id (thread safely)
  99. int connectionId = NextConnectionId();
  100. // add to dict immediately
  101. ConnectionState connection = new ConnectionState(client, MaxMessageSize);
  102. clients[connectionId] = connection;
  103. // spawn a send thread for each client
  104. Thread sendThread = new Thread(() =>
  105. {
  106. // wrap in try-catch, otherwise Thread exceptions
  107. // are silent
  108. try
  109. {
  110. // run the send loop
  111. // IMPORTANT: DO NOT SHARE STATE ACROSS MULTIPLE THREADS!
  112. ThreadFunctions.SendLoop(connectionId, client, connection.sendPipe, connection.sendPending);
  113. }
  114. catch (ThreadAbortException)
  115. {
  116. // happens on stop. don't log anything.
  117. // (we catch it in SendLoop too, but it still gets
  118. // through to here when aborting. don't show an
  119. // error.)
  120. }
  121. catch (Exception exception)
  122. {
  123. Log.Error("Server send thread exception: " + exception);
  124. }
  125. });
  126. sendThread.IsBackground = true;
  127. sendThread.Start();
  128. // spawn a receive thread for each client
  129. Thread receiveThread = new Thread(() =>
  130. {
  131. // wrap in try-catch, otherwise Thread exceptions
  132. // are silent
  133. try
  134. {
  135. // run the receive loop
  136. // (receive pipe is shared across all loops)
  137. ThreadFunctions.ReceiveLoop(connectionId, client, MaxMessageSize, receivePipe, ReceiveQueueLimit);
  138. // IMPORTANT: do NOT remove from clients after the
  139. // thread ends. need to do it in Tick() so that the
  140. // disconnect event in the pipe is still processed.
  141. // (removing client immediately would mean that the
  142. // pipe is lost and the disconnect event is never
  143. // processed)
  144. // sendthread might be waiting on ManualResetEvent,
  145. // so let's make sure to end it if the connection
  146. // closed.
  147. // otherwise the send thread would only end if it's
  148. // actually sending data while the connection is
  149. // closed.
  150. sendThread.Interrupt();
  151. }
  152. catch (Exception exception)
  153. {
  154. Log.Error("Server client thread exception: " + exception);
  155. }
  156. });
  157. receiveThread.IsBackground = true;
  158. receiveThread.Start();
  159. }
  160. }
  161. catch (ThreadAbortException exception)
  162. {
  163. // UnityEditor causes AbortException if thread is still
  164. // running when we press Play again next time. that's okay.
  165. Log.Info("Server thread aborted. That's okay. " + exception);
  166. }
  167. catch (SocketException exception)
  168. {
  169. // calling StopServer will interrupt this thread with a
  170. // 'SocketException: interrupted'. that's okay.
  171. Log.Info("Server Thread stopped. That's okay. " + exception);
  172. }
  173. catch (Exception exception)
  174. {
  175. // something went wrong. probably important.
  176. Log.Error("Server Exception: " + exception);
  177. }
  178. }
  179. // start listening for new connections in a background thread and spawn
  180. // a new thread for each one.
  181. public bool Start(int port)
  182. {
  183. // not if already started
  184. if (Active) return false;
  185. // create receive pipe with max message size for pooling
  186. // => create new pipes every time!
  187. // if an old receive thread is still finishing up, it might still
  188. // be using the old pipes. we don't want to risk any old data for
  189. // our new start here.
  190. receivePipe = new MagnificentReceivePipe(MaxMessageSize);
  191. // start the listener thread
  192. // (on low priority. if main thread is too busy then there is not
  193. // much value in accepting even more clients)
  194. Log.Info("Server: Start port=" + port);
  195. listenerThread = new Thread(() => { Listen(port); });
  196. listenerThread.IsBackground = true;
  197. listenerThread.Priority = ThreadPriority.BelowNormal;
  198. listenerThread.Start();
  199. return true;
  200. }
  201. public void Stop()
  202. {
  203. // only if started
  204. if (!Active) return;
  205. Log.Info("Server: stopping...");
  206. // stop listening to connections so that no one can connect while we
  207. // close the client connections
  208. // (might be null if we call Stop so quickly after Start that the
  209. // thread was interrupted before even creating the listener)
  210. listener?.Stop();
  211. // kill listener thread at all costs. only way to guarantee that
  212. // .Active is immediately false after Stop.
  213. // -> calling .Join would sometimes wait forever
  214. listenerThread?.Interrupt();
  215. listenerThread = null;
  216. // close all client connections
  217. foreach (KeyValuePair<int, ConnectionState> kvp in clients)
  218. {
  219. TcpClient client = kvp.Value.client;
  220. // close the stream if not closed yet. it may have been closed
  221. // by a disconnect already, so use try/catch
  222. try { client.GetStream().Close(); } catch {}
  223. client.Close();
  224. }
  225. // clear clients list
  226. clients.Clear();
  227. // reset the counter in case we start up again so
  228. // clients get connection ID's starting from 1
  229. counter = 0;
  230. }
  231. // send message to client using socket connection.
  232. // arraysegment for allocation free sends later.
  233. // -> the segment's array is only used until Send() returns!
  234. public bool Send(int connectionId, ArraySegment<byte> message)
  235. {
  236. // respect max message size to avoid allocation attacks.
  237. if (message.Count <= MaxMessageSize)
  238. {
  239. // find the connection
  240. if (clients.TryGetValue(connectionId, out ConnectionState connection))
  241. {
  242. // check send pipe limit
  243. if (connection.sendPipe.Count < SendQueueLimit)
  244. {
  245. // add to thread safe send pipe and return immediately.
  246. // calling Send here would be blocking (sometimes for long
  247. // times if other side lags or wire was disconnected)
  248. connection.sendPipe.Enqueue(message);
  249. connection.sendPending.Set(); // interrupt SendThread WaitOne()
  250. return true;
  251. }
  252. // disconnect if send queue gets too big.
  253. // -> avoids ever growing queue memory if network is slower
  254. // than input
  255. // -> disconnecting is great for load balancing. better to
  256. // disconnect one connection than risking every
  257. // connection / the whole server
  258. //
  259. // note: while SendThread always grabs the WHOLE send queue
  260. // immediately, it's still possible that the sending
  261. // blocks for so long that the send queue just gets
  262. // way too big. have a limit - better safe than sorry.
  263. else
  264. {
  265. // log the reason
  266. Log.Warning($"Server.Send: sendPipe for connection {connectionId} reached limit of {SendQueueLimit}. This can happen if we call send faster than the network can process messages. Disconnecting this connection for load balancing.");
  267. // just close it. send thread will take care of the rest.
  268. connection.client.Close();
  269. return false;
  270. }
  271. }
  272. // sending to an invalid connectionId is expected sometimes.
  273. // for example, if a client disconnects, the server might still
  274. // try to send for one frame before it calls GetNextMessages
  275. // again and realizes that a disconnect happened.
  276. // so let's not spam the console with log messages.
  277. //Logger.Log("Server.Send: invalid connectionId: " + connectionId);
  278. return false;
  279. }
  280. Log.Error("Server.Send: message too big: " + message.Count + ". Limit: " + MaxMessageSize);
  281. return false;
  282. }
  283. // client's ip is sometimes needed by the server, e.g. for bans
  284. public string GetClientAddress(int connectionId)
  285. {
  286. // find the connection
  287. if (clients.TryGetValue(connectionId, out ConnectionState connection))
  288. {
  289. return ((IPEndPoint)connection.client.Client.RemoteEndPoint).Address.ToString();
  290. }
  291. return "";
  292. }
  293. // disconnect (kick) a client
  294. public bool Disconnect(int connectionId)
  295. {
  296. // find the connection
  297. if (clients.TryGetValue(connectionId, out ConnectionState connection))
  298. {
  299. // just close it. send thread will take care of the rest.
  300. connection.client.Close();
  301. Log.Info("Server.Disconnect connectionId:" + connectionId);
  302. return true;
  303. }
  304. return false;
  305. }
  306. // tick: processes up to 'limit' messages for each connection
  307. // => limit parameter to avoid deadlocks / too long freezes if server or
  308. // client is too slow to process network load
  309. // => Mirror & DOTSNET need to have a process limit anyway.
  310. // might as well do it here and make life easier.
  311. // => returns amount of remaining messages to process, so the caller
  312. // can call tick again as many times as needed (or up to a limit)
  313. //
  314. // Tick() may process multiple messages, but Mirror needs a way to stop
  315. // processing immediately if a scene change messages arrives. Mirror
  316. // can't process any other messages during a scene change.
  317. // (could be useful for others too)
  318. // => make sure to allocate the lambda only once in transports
  319. public int Tick(int processLimit, Func<bool> checkEnabled = null)
  320. {
  321. // only if pipe was created yet (after start())
  322. if (receivePipe == null)
  323. return 0;
  324. // process up to 'processLimit' messages for this connection
  325. for (int i = 0; i < processLimit; ++i)
  326. {
  327. // check enabled in case a Mirror scene message arrived
  328. if (checkEnabled != null && !checkEnabled())
  329. break;
  330. // peek first. allows us to process the first queued entry while
  331. // still keeping the pooled byte[] alive by not removing anything.
  332. if (receivePipe.TryPeek(out int connectionId, out EventType eventType, out ArraySegment<byte> message))
  333. {
  334. switch (eventType)
  335. {
  336. case EventType.Connected:
  337. OnConnected?.Invoke(connectionId);
  338. break;
  339. case EventType.Data:
  340. OnData?.Invoke(connectionId, message);
  341. break;
  342. case EventType.Disconnected:
  343. OnDisconnected?.Invoke(connectionId);
  344. // remove disconnected connection now that the final
  345. // disconnected message was processed.
  346. clients.TryRemove(connectionId, out ConnectionState _);
  347. break;
  348. }
  349. // IMPORTANT: now dequeue and return it to pool AFTER we are
  350. // done processing the event.
  351. receivePipe.TryDequeue();
  352. }
  353. // no more messages. stop the loop.
  354. else break;
  355. }
  356. // return what's left to process for next time
  357. return receivePipe.TotalCount;
  358. }
  359. }
  360. }