ThreadedTransport.cs 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655
  1. // threaded transport to handle all the magic.
  2. // implementations are automatically elevated to the worker thread
  3. // by simply overwriting all the thread functions
  4. //
  5. // note that ThreadLog.cs is required for Debug.Log from threads to work in builds.
  6. using System;
  7. using System.Collections.Concurrent;
  8. using System.Runtime.CompilerServices;
  9. using System.Threading;
  10. using UnityEngine;
  11. namespace Mirror
  12. {
  13. // buffered events for main thread
  14. enum ClientMainEventType
  15. {
  16. OnClientConnected,
  17. OnClientSent,
  18. OnClientReceived,
  19. OnClientError,
  20. OnClientDisconnected,
  21. }
  22. enum ServerMainEventType
  23. {
  24. OnServerConnected,
  25. OnServerSent,
  26. OnServerReceived,
  27. OnServerError,
  28. OnServerDisconnected,
  29. }
  30. // buffered events for worker thread
  31. enum ThreadEventType
  32. {
  33. DoServerStart,
  34. DoServerSend,
  35. DoServerDisconnect,
  36. DoServerStop,
  37. DoClientConnect,
  38. DoClientSend,
  39. DoClientDisconnect,
  40. DoShutdown
  41. }
  42. struct ClientMainEvent
  43. {
  44. public ClientMainEventType type;
  45. public object param;
  46. // some events have value type parameters: connectionId, error.
  47. // store them explicitly to avoid boxing allocations to 'object param'.
  48. public int? channelId; // connect/disconnect don't have a channel
  49. public TransportError? error;
  50. public ClientMainEvent(
  51. ClientMainEventType type,
  52. object param,
  53. int? channelId = null,
  54. TransportError? error = null)
  55. {
  56. this.type = type;
  57. this.channelId = channelId;
  58. this.error = error;
  59. this.param = param;
  60. }
  61. }
  62. struct ServerMainEvent
  63. {
  64. public ServerMainEventType type;
  65. public object param;
  66. // some events have value type parameters: connectionId, error.
  67. // store them explicitly to avoid boxing allocations to 'object param'.
  68. public int? connectionId; // only server needs connectionId
  69. public int? channelId; // connect/disconnect don't have a channel
  70. public TransportError? error;
  71. public ServerMainEvent(
  72. ServerMainEventType type,
  73. object param,
  74. int? connectionId,
  75. int? channelId = null,
  76. TransportError? error = null)
  77. {
  78. this.type = type;
  79. this.channelId = channelId;
  80. this.connectionId = connectionId;
  81. this.error = error;
  82. this.param = param;
  83. }
  84. }
  85. struct ThreadEvent
  86. {
  87. public ThreadEventType type;
  88. public object param;
  89. // some events have value type parameters: connectionId.
  90. // store them explicitly to avoid boxing allocations to 'object param'.
  91. public int? connectionId;
  92. public int? channelId;
  93. public ThreadEvent(
  94. ThreadEventType type,
  95. object param,
  96. int? connectionId = null,
  97. int? channelId = null)
  98. {
  99. this.type = type;
  100. this.connectionId = connectionId;
  101. this.channelId = channelId;
  102. this.param = param;
  103. }
  104. }
  105. public abstract class ThreadedTransport : Transport
  106. {
  107. WorkerThread thread;
  108. // main thread's event queue.
  109. // worker thread puts events in, main thread processes them.
  110. // client & server separate because EarlyUpdate is separate too.
  111. // TODO nonalloc
  112. readonly ConcurrentQueue<ClientMainEvent> clientMainQueue = new ConcurrentQueue<ClientMainEvent>();
  113. readonly ConcurrentQueue<ServerMainEvent> serverMainQueue = new ConcurrentQueue<ServerMainEvent>();
  114. // worker thread's event queue
  115. // main thread puts events in, worker thread processes them.
  116. // TODO nonalloc
  117. readonly ConcurrentQueue<ThreadEvent> threadQueue = new ConcurrentQueue<ThreadEvent>();
  118. // active flags, since we can't access server/client from main thread
  119. volatile bool serverActive;
  120. volatile bool clientConnected;
  121. // max number of thread messages to process per tick in main thread.
  122. // very large limit to prevent deadlocks.
  123. const int MaxProcessingPerTick = 10_000_000;
  124. // communication between main & worker thread //////////////////////////
  125. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  126. void EnqueueClientMain(
  127. ClientMainEventType type,
  128. object param,
  129. int? channelId,
  130. TransportError? error) =>
  131. clientMainQueue.Enqueue(new ClientMainEvent(type, param, channelId, error));
  132. // add an event for main thread
  133. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  134. void EnqueueServerMain(
  135. ServerMainEventType type,
  136. object param,
  137. int? connectionId,
  138. int? channelId,
  139. TransportError? error) =>
  140. serverMainQueue.Enqueue(new ServerMainEvent(type, param, connectionId, channelId, error));
  141. void EnqueueThread(
  142. ThreadEventType type,
  143. object param,
  144. int? channelId,
  145. int? connectionId) =>
  146. threadQueue.Enqueue(new ThreadEvent(type, param, connectionId, channelId));
  147. // Unity callbacks /////////////////////////////////////////////////////
  148. protected virtual void Awake()
  149. {
  150. // start the thread.
  151. // if main application terminates, this thread needs to terminate too.
  152. thread = new WorkerThread(ToString());
  153. thread.Tick = ThreadTick;
  154. thread.Cleanup = ThreadedShutdown;
  155. thread.Start();
  156. }
  157. protected virtual void OnDestroy()
  158. {
  159. // stop thread fully
  160. Shutdown();
  161. // TODO recycle writers.
  162. }
  163. // worker thread ///////////////////////////////////////////////////////
  164. void ProcessThreadQueue()
  165. {
  166. // TODO deadlock protection. worker thread may be to slow to process all.
  167. while (threadQueue.TryDequeue(out ThreadEvent elem))
  168. {
  169. switch (elem.type)
  170. {
  171. // SERVER EVENTS ///////////////////////////////////////////
  172. case ThreadEventType.DoServerStart: // start listening
  173. {
  174. // call the threaded function
  175. ThreadedServerStart();
  176. break;
  177. }
  178. case ThreadEventType.DoServerSend:
  179. {
  180. // call the threaded function
  181. ConcurrentNetworkWriterPooled writer = (ConcurrentNetworkWriterPooled)elem.param;
  182. ThreadedServerSend(elem.connectionId.Value, writer, elem.channelId.Value);
  183. // recycle writer to thread safe pool for reuse
  184. ConcurrentNetworkWriterPool.Return(writer);
  185. break;
  186. }
  187. case ThreadEventType.DoServerDisconnect:
  188. {
  189. // call the threaded function
  190. ThreadedServerDisconnect(elem.connectionId.Value);
  191. break;
  192. }
  193. case ThreadEventType.DoServerStop: // stop listening
  194. {
  195. // call the threaded function
  196. ThreadedServerStop();
  197. break;
  198. }
  199. // CLIENT EVENTS ///////////////////////////////////////////
  200. case ThreadEventType.DoClientConnect:
  201. {
  202. // call the threaded function
  203. if (elem.param is string address)
  204. ThreadedClientConnect(address);
  205. else if (elem.param is Uri uri)
  206. ThreadedClientConnect(uri);
  207. break;
  208. }
  209. case ThreadEventType.DoClientSend:
  210. {
  211. // call the threaded function
  212. ConcurrentNetworkWriterPooled writer = (ConcurrentNetworkWriterPooled)elem.param;
  213. ThreadedClientSend(writer, elem.channelId.Value);
  214. // recycle writer to thread safe pool for reuse
  215. ConcurrentNetworkWriterPool.Return(writer);
  216. break;
  217. }
  218. case ThreadEventType.DoClientDisconnect:
  219. {
  220. // call the threaded function
  221. ThreadedClientDisconnect();
  222. break;
  223. }
  224. // SHUTDOWN ////////////////////////////////////////////////
  225. case ThreadEventType.DoShutdown:
  226. {
  227. // call the threaded function
  228. ThreadedShutdown();
  229. break;
  230. }
  231. }
  232. }
  233. }
  234. void ThreadTick()
  235. {
  236. // early update the implementation first
  237. ThreadedClientEarlyUpdate();
  238. ThreadedServerEarlyUpdate();
  239. // process queued user requests
  240. ProcessThreadQueue();
  241. // late update the implementation at the end
  242. ThreadedClientLateUpdate();
  243. ThreadedServerLateUpdate();
  244. // save some cpu power.
  245. // TODO update interval and sleep extra time would be ideal
  246. Thread.Sleep(1);
  247. }
  248. // threaded callbacks to call from transport thread.
  249. // they will be queued up for main thread automatically.
  250. protected void OnThreadedClientConnected()
  251. {
  252. EnqueueClientMain(ClientMainEventType.OnClientConnected, null, null, null);
  253. }
  254. protected void OnThreadedClientSend(ArraySegment<byte> message, int channelId)
  255. {
  256. // ArraySegment is only valid until returning.
  257. // copy to a writer until main thread processes it.
  258. // make sure to recycle the writer in main thread.
  259. ConcurrentNetworkWriterPooled writer = ConcurrentNetworkWriterPool.Get();
  260. writer.WriteBytes(message.Array, message.Offset, message.Count);
  261. EnqueueClientMain(ClientMainEventType.OnClientSent, writer, channelId, null);
  262. }
  263. protected void OnThreadedClientReceive(ArraySegment<byte> message, int channelId)
  264. {
  265. // ArraySegment is only valid until returning.
  266. // copy to a writer until main thread processes it.
  267. // make sure to recycle the writer in main thread.
  268. ConcurrentNetworkWriterPooled writer = ConcurrentNetworkWriterPool.Get();
  269. writer.WriteBytes(message.Array, message.Offset, message.Count);
  270. EnqueueClientMain(ClientMainEventType.OnClientReceived, writer, channelId, null);
  271. }
  272. protected void OnThreadedClientError(TransportError error, string reason)
  273. {
  274. EnqueueClientMain(ClientMainEventType.OnClientError, reason, null, error);
  275. }
  276. protected void OnThreadedClientDisconnected()
  277. {
  278. EnqueueClientMain(ClientMainEventType.OnClientDisconnected, null, null, null);
  279. }
  280. protected void OnThreadedServerConnected(int connectionId)
  281. {
  282. EnqueueServerMain(ServerMainEventType.OnServerConnected, null, connectionId, null, null);
  283. }
  284. protected void OnThreadedServerSend(int connectionId, ArraySegment<byte> message, int channelId)
  285. {
  286. // ArraySegment is only valid until returning.
  287. // copy to a writer until main thread processes it.
  288. // make sure to recycle the writer in main thread.
  289. ConcurrentNetworkWriterPooled writer = ConcurrentNetworkWriterPool.Get();
  290. writer.WriteBytes(message.Array, message.Offset, message.Count);
  291. EnqueueServerMain(ServerMainEventType.OnServerSent, writer, connectionId, channelId, null);
  292. }
  293. protected void OnThreadedServerReceive(int connectionId, ArraySegment<byte> message, int channelId)
  294. {
  295. // ArraySegment is only valid until returning.
  296. // copy to a writer until main thread processes it.
  297. // make sure to recycle the writer in main thread.
  298. ConcurrentNetworkWriterPooled writer = ConcurrentNetworkWriterPool.Get();
  299. writer.WriteBytes(message.Array, message.Offset, message.Count);
  300. EnqueueServerMain(ServerMainEventType.OnServerReceived, writer, connectionId, channelId, null);
  301. }
  302. protected void OnThreadedServerError(int connectionId, TransportError error, string reason)
  303. {
  304. EnqueueServerMain(ServerMainEventType.OnServerError, reason, connectionId, null, error);
  305. }
  306. protected void OnThreadedServerDisconnected(int connectionId)
  307. {
  308. EnqueueServerMain(ServerMainEventType.OnServerDisconnected, null, connectionId, null, null);
  309. }
  310. protected abstract void ThreadedClientConnect(string address);
  311. protected abstract void ThreadedClientConnect(Uri address);
  312. protected abstract void ThreadedClientSend(ArraySegment<byte> message, int channelId);
  313. protected abstract void ThreadedClientDisconnect();
  314. protected abstract void ThreadedServerStart();
  315. protected abstract void ThreadedServerStop();
  316. protected abstract void ThreadedServerSend(int connectionId, ArraySegment<byte> message, int channelId);
  317. protected abstract void ThreadedServerDisconnect(int connectionId);
  318. // threaded update functions.
  319. // make sure not to call main thread OnReceived etc. events.
  320. // queue everything.
  321. protected abstract void ThreadedClientEarlyUpdate();
  322. protected abstract void ThreadedClientLateUpdate();
  323. protected abstract void ThreadedServerEarlyUpdate();
  324. protected abstract void ThreadedServerLateUpdate();
  325. protected abstract void ThreadedShutdown();
  326. // client //////////////////////////////////////////////////////////////
  327. // implementations need to use ThreadedEarlyUpdate
  328. public override void ClientEarlyUpdate()
  329. {
  330. // regular transports process OnReceive etc. from early update.
  331. // need to process the worker thread's queued events here too.
  332. //
  333. // process only up to N messages per tick here.
  334. // if main thread becomes too slow, we don't want to deadlock.
  335. int processed = 0;
  336. while (clientMainQueue.TryDequeue(out ClientMainEvent elem))
  337. {
  338. switch (elem.type)
  339. {
  340. // CLIENT EVENTS ///////////////////////////////////////////
  341. case ClientMainEventType.OnClientConnected:
  342. {
  343. // call original transport event
  344. OnClientConnected?.Invoke();
  345. break;
  346. }
  347. case ClientMainEventType.OnClientSent:
  348. {
  349. // call original transport event
  350. ConcurrentNetworkWriterPooled writer = (ConcurrentNetworkWriterPooled)elem.param;
  351. OnClientDataSent?.Invoke(writer, elem.channelId.Value);
  352. // recycle writer to thread safe pool for reuse
  353. ConcurrentNetworkWriterPool.Return(writer);
  354. break;
  355. }
  356. case ClientMainEventType.OnClientReceived:
  357. {
  358. // call original transport event
  359. ConcurrentNetworkWriterPooled writer = (ConcurrentNetworkWriterPooled)elem.param;
  360. OnClientDataReceived?.Invoke(writer, elem.channelId.Value);
  361. // recycle writer to thread safe pool for reuse
  362. ConcurrentNetworkWriterPool.Return(writer);
  363. break;
  364. }
  365. case ClientMainEventType.OnClientError:
  366. {
  367. // call original transport event
  368. OnClientError?.Invoke(elem.error.Value, (string)elem.param);
  369. break;
  370. }
  371. case ClientMainEventType.OnClientDisconnected:
  372. {
  373. // call original transport event
  374. OnClientDisconnected?.Invoke();
  375. break;
  376. }
  377. }
  378. // process only up to N messages per tick here.
  379. // if main thread becomes too slow, we don't want to deadlock.
  380. if (++processed >= MaxProcessingPerTick)
  381. {
  382. Debug.LogWarning($"ThreadedTransport processed the limit of {MaxProcessingPerTick} messages this tick. Continuing next tick to prevent deadlock.");
  383. break;
  384. }
  385. }
  386. }
  387. // manual state flag because implementations can't access their
  388. // threaded .server/.client state from main thread.
  389. public override bool ClientConnected() => clientConnected;
  390. public override void ClientConnect(string address)
  391. {
  392. // don't connect the thread twice
  393. if (ClientConnected())
  394. {
  395. Debug.LogWarning($"Threaded transport: client already connected!");
  396. return;
  397. }
  398. // enqueue to process in worker thread
  399. EnqueueThread(ThreadEventType.DoClientConnect, address, null, null);
  400. // manual state flag because implementations can't access their
  401. // threaded .server/.client state from main thread.
  402. clientConnected = true;
  403. }
  404. public override void ClientConnect(Uri uri)
  405. {
  406. // don't connect the thread twice
  407. if (ClientConnected())
  408. {
  409. Debug.LogWarning($"Threaded transport: client already connected!");
  410. return;
  411. }
  412. // enqueue to process in worker thread
  413. EnqueueThread(ThreadEventType.DoClientConnect, uri, null, null);
  414. // manual state flag because implementations can't access their
  415. // threaded .server/.client state from main thread.
  416. clientConnected = true;
  417. }
  418. public override void ClientSend(ArraySegment<byte> segment, int channelId = Channels.Reliable)
  419. {
  420. if (!ClientConnected()) return;
  421. // segment is only valid until returning.
  422. // copy it to a writer.
  423. // make sure to recycle it from worker thread.
  424. ConcurrentNetworkWriterPooled writer = ConcurrentNetworkWriterPool.Get();
  425. writer.WriteBytes(segment.Array, segment.Offset, segment.Count);
  426. // enqueue to process in worker thread
  427. EnqueueThread(ThreadEventType.DoClientSend, writer, channelId, null);
  428. }
  429. public override void ClientDisconnect()
  430. {
  431. EnqueueThread(ThreadEventType.DoClientDisconnect, null, null, null);
  432. // manual state flag because implementations can't access their
  433. // threaded .server/.client state from main thread.
  434. clientConnected = false;
  435. }
  436. // server //////////////////////////////////////////////////////////////
  437. // implementations need to use ThreadedEarlyUpdate
  438. public override void ServerEarlyUpdate()
  439. {
  440. // regular transports process OnReceive etc. from early update.
  441. // need to process the worker thread's queued events here too.
  442. //
  443. // process only up to N messages per tick here.
  444. // if main thread becomes too slow, we don't want to deadlock.
  445. int processed = 0;
  446. while (serverMainQueue.TryDequeue(out ServerMainEvent elem))
  447. {
  448. switch (elem.type)
  449. {
  450. // SERVER EVENTS ///////////////////////////////////////////
  451. case ServerMainEventType.OnServerConnected:
  452. {
  453. // call original transport event
  454. // TODO pass client address in OnConnect here later
  455. OnServerConnected?.Invoke(elem.connectionId.Value);//, (string)elem.param);
  456. break;
  457. }
  458. case ServerMainEventType.OnServerSent:
  459. {
  460. // call original transport event
  461. ConcurrentNetworkWriterPooled writer = (ConcurrentNetworkWriterPooled)elem.param;
  462. OnServerDataSent?.Invoke(elem.connectionId.Value, writer, elem.channelId.Value);
  463. // recycle writer to thread safe pool for reuse
  464. ConcurrentNetworkWriterPool.Return(writer);
  465. break;
  466. }
  467. case ServerMainEventType.OnServerReceived:
  468. {
  469. // call original transport event
  470. ConcurrentNetworkWriterPooled writer = (ConcurrentNetworkWriterPooled)elem.param;
  471. OnServerDataReceived?.Invoke(elem.connectionId.Value, writer, elem.channelId.Value);
  472. // recycle writer to thread safe pool for reuse
  473. ConcurrentNetworkWriterPool.Return(writer);
  474. break;
  475. }
  476. case ServerMainEventType.OnServerError:
  477. {
  478. // call original transport event
  479. OnServerError?.Invoke(elem.connectionId.Value, elem.error.Value, (string)elem.param);
  480. break;
  481. }
  482. case ServerMainEventType.OnServerDisconnected:
  483. {
  484. // call original transport event
  485. OnServerDisconnected?.Invoke(elem.connectionId.Value);
  486. break;
  487. }
  488. }
  489. // process only up to N messages per tick here.
  490. // if main thread becomes too slow, we don't want to deadlock.
  491. if (++processed >= MaxProcessingPerTick)
  492. {
  493. Debug.LogWarning($"ThreadedTransport processed the limit of {MaxProcessingPerTick} messages this tick. Continuing next tick to prevent deadlock.");
  494. break;
  495. }
  496. }
  497. }
  498. // implementations need to use ThreadedLateUpdate
  499. public override void ServerLateUpdate() {}
  500. // manual state flag because implementations can't access their
  501. // threaded .server/.client state from main thread.
  502. public override bool ServerActive() => serverActive;
  503. public override void ServerStart()
  504. {
  505. // don't start the thread twice
  506. if (ServerActive())
  507. {
  508. Debug.LogWarning($"Threaded transport: server already started!");
  509. return;
  510. }
  511. // enqueue to process in worker thread
  512. EnqueueThread(ThreadEventType.DoServerStart, null, null, null);
  513. // manual state flag because implementations can't access their
  514. // threaded .server/.client state from main thread.
  515. serverActive = true;
  516. }
  517. public override void ServerSend(int connectionId, ArraySegment<byte> segment, int channelId = Channels.Reliable)
  518. {
  519. if (!ServerActive()) return;
  520. // segment is only valid until returning.
  521. // copy it to a writer.
  522. // make sure to recycle it from worker thread.
  523. ConcurrentNetworkWriterPooled writer = ConcurrentNetworkWriterPool.Get();
  524. writer.WriteBytes(segment.Array, segment.Offset, segment.Count);
  525. // enqueue to process in worker thread
  526. EnqueueThread(ThreadEventType.DoServerSend, writer, channelId, connectionId);
  527. }
  528. public override void ServerDisconnect(int connectionId)
  529. {
  530. // enqueue to process in worker thread
  531. EnqueueThread(ThreadEventType.DoServerDisconnect, null, null, connectionId);
  532. }
  533. // TODO pass address in OnConnected.
  534. // querying this at runtime won't work for threaded transports.
  535. public override string ServerGetClientAddress(int connectionId)
  536. {
  537. throw new NotImplementedException();
  538. }
  539. public override void ServerStop()
  540. {
  541. // enqueue to process in worker thread
  542. EnqueueThread(ThreadEventType.DoServerStop, null, null, null);
  543. // manual state flag because implementations can't access their
  544. // threaded .server/.client state from main thread.
  545. serverActive = false;
  546. }
  547. // shutdown ////////////////////////////////////////////////////////////
  548. public override void Shutdown()
  549. {
  550. // enqueue to process in worker thread
  551. EnqueueThread(ThreadEventType.DoShutdown, null, null, null);
  552. // need to wait a little for worker thread to process the enqueued
  553. // Shutdown event and do proper cleanup.
  554. //
  555. // otherwise if a server with a connected client is stopped,
  556. // and then started, a warning would be shown when starting again
  557. // about an old connection not being found because it wasn't cleared
  558. // in KCP
  559. // TODO cleaner
  560. Thread.Sleep(100);
  561. // stop thread fully, with timeout
  562. // ?.: 'thread' might be null after script reload -> stop play
  563. thread?.StopBlocking(1);
  564. // clear queues so we don't process old messages
  565. // when listening again later
  566. clientMainQueue.Clear();
  567. serverMainQueue.Clear();
  568. threadQueue.Clear();
  569. }
  570. }
  571. }