ThreadedKcpTransport.cs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. // Threaded version of our KCP transport.
  2. // Elevates a few milliseconds of transport computations into a worker thread.
  3. //
  4. //#if MIRROR <- commented out because MIRROR isn't defined on first import yet
  5. using System;
  6. using System.Linq;
  7. using System.Net;
  8. using UnityEngine;
  9. using Mirror;
  10. using Unity.Collections;
  11. using UnityEngine.Serialization;
  12. namespace kcp2k
  13. {
  14. [HelpURL("https://mirror-networking.gitbook.io/docs/transports/kcp-transport")]
  15. [DisallowMultipleComponent]
  16. public class ThreadedKcpTransport : ThreadedTransport, PortTransport
  17. {
  18. // scheme used by this transport
  19. public const string Scheme = "kcp";
  20. // common
  21. [Header("Transport Configuration")]
  22. [FormerlySerializedAs("Port")]
  23. public ushort port = 7777;
  24. public ushort Port { get => port; set => port=value; }
  25. [Tooltip("DualMode listens to IPv6 and IPv4 simultaneously. Disable if the platform only supports IPv4.")]
  26. public bool DualMode = true;
  27. [Tooltip("NoDelay is recommended to reduce latency. This also scales better without buffers getting full.")]
  28. public bool NoDelay = true;
  29. [Tooltip("KCP internal update interval. 100ms is KCP default, but a lower interval is recommended to minimize latency and to scale to more networked entities.")]
  30. public uint Interval = 10;
  31. [Tooltip("KCP timeout in milliseconds. Note that KCP sends a ping automatically.")]
  32. public int Timeout = 10000;
  33. [Tooltip("Socket receive buffer size. Large buffer helps support more connections. Increase operating system socket buffer size limits if needed.")]
  34. public int RecvBufferSize = 1024 * 1027 * 7;
  35. [Tooltip("Socket send buffer size. Large buffer helps support more connections. Increase operating system socket buffer size limits if needed.")]
  36. public int SendBufferSize = 1024 * 1027 * 7;
  37. [Header("Advanced")]
  38. [Tooltip("KCP fastresend parameter. Faster resend for the cost of higher bandwidth. 0 in normal mode, 2 in turbo mode.")]
  39. public int FastResend = 2;
  40. [Tooltip("KCP congestion window. Restricts window size to reduce congestion. Results in only 2-3 MTU messages per Flush even on loopback. Best to keept his disabled.")]
  41. /*public*/ bool CongestionWindow = false; // KCP 'NoCongestionWindow' is false by default. here we negate it for ease of use.
  42. [Tooltip("KCP window size can be modified to support higher loads. This also increases max message size.")]
  43. public uint ReceiveWindowSize = 4096; //Kcp.WND_RCV; 128 by default. Mirror sends a lot, so we need a lot more.
  44. [Tooltip("KCP window size can be modified to support higher loads.")]
  45. public uint SendWindowSize = 4096; //Kcp.WND_SND; 32 by default. Mirror sends a lot, so we need a lot more.
  46. [Tooltip("KCP will try to retransmit lost messages up to MaxRetransmit (aka dead_link) before disconnecting.")]
  47. public uint MaxRetransmit = Kcp.DEADLINK * 2; // default prematurely disconnects a lot of people (#3022). use 2x.
  48. [Tooltip("Enable to automatically set client & server send/recv buffers to OS limit. Avoids issues with too small buffers under heavy load, potentially dropping connections. Increase the OS limit if this is still too small.")]
  49. [FormerlySerializedAs("MaximizeSendReceiveBuffersToOSLimit")]
  50. public bool MaximizeSocketBuffers = true;
  51. [Header("Allowed Max Message Sizes\nBased on Receive Window Size")]
  52. [Tooltip("KCP reliable max message size shown for convenience. Can be changed via ReceiveWindowSize.")]
  53. [ReadOnly] public int ReliableMaxMessageSize = 0; // readonly, displayed from OnValidate
  54. [Tooltip("KCP unreliable channel max message size for convenience. Not changeable.")]
  55. [ReadOnly] public int UnreliableMaxMessageSize = 0; // readonly, displayed from OnValidate
  56. // config is created from the serialized properties above.
  57. // we can expose the config directly in the future.
  58. // for now, let's not break people's old settings.
  59. protected KcpConfig config;
  60. // use default MTU for this transport.
  61. const int MTU = Kcp.MTU_DEF;
  62. // server & client
  63. KcpServer server; // USED IN WORKER THREAD. DON'T TOUCH FROM MAIN THREAD!
  64. KcpClient client; // USED IN WORKER THREAD. DON'T TOUCH FROM MAIN THREAD!
  65. // copy MonoBehaviour.enabled for thread safe access
  66. volatile bool enabledCopy = true;
  67. // debugging
  68. [Header("Debug")]
  69. public bool debugLog;
  70. // show statistics in OnGUI
  71. public bool statisticsGUI;
  72. // log statistics for headless servers that can't show them in GUI
  73. public bool statisticsLog;
  74. protected override void Awake()
  75. {
  76. // logging
  77. // Log.Info should use Debug.Log if enabled, or nothing otherwise
  78. // (don't want to spam the console on headless servers)
  79. // THREAD SAFE thanks to ThreadLog.cs
  80. if (debugLog)
  81. Log.Info = Debug.Log;
  82. else
  83. Log.Info = _ => {};
  84. Log.Warning = Debug.LogWarning;
  85. Log.Error = Debug.LogError;
  86. // create config from serialized settings
  87. config = new KcpConfig(DualMode, RecvBufferSize, SendBufferSize, MTU, NoDelay, Interval, FastResend, CongestionWindow, SendWindowSize, ReceiveWindowSize, Timeout, MaxRetransmit);
  88. // client (NonAlloc version is not necessary anymore)
  89. client = new KcpClient(
  90. OnThreadedClientConnected,
  91. (message, channel) => OnThreadedClientReceive(message, KcpTransport.FromKcpChannel(channel)),
  92. OnThreadedClientDisconnected,
  93. (error, reason) => OnThreadedClientError(KcpTransport.ToTransportError(error), reason),
  94. config
  95. );
  96. // server
  97. server = new KcpServer(
  98. OnThreadedServerConnected,
  99. (connectionId, message, channel) => OnThreadedServerReceive(connectionId, message, KcpTransport.FromKcpChannel(channel)),
  100. OnThreadedServerDisconnected,
  101. (connectionId, error, reason) => OnThreadedServerError(connectionId, KcpTransport.ToTransportError(error), reason),
  102. config
  103. );
  104. if (statisticsLog)
  105. InvokeRepeating(nameof(OnLogStatistics), 1, 1);
  106. // call base after creating kcp.
  107. // it'll be used by the created thread immediately.
  108. base.Awake();
  109. Log.Info("ThreadedKcpTransport initialized!");
  110. }
  111. protected virtual void OnValidate()
  112. {
  113. // show max message sizes in inspector for convenience.
  114. // 'config' isn't available in edit mode yet, so use MTU define.
  115. ReliableMaxMessageSize = KcpPeer.ReliableMaxMessageSize(MTU, ReceiveWindowSize);
  116. UnreliableMaxMessageSize = KcpPeer.UnreliableMaxMessageSize(MTU);
  117. }
  118. // copy MonoBehaviour.enabled for thread safe use
  119. void OnEnable() => enabledCopy = true;
  120. void OnDisable() => enabledCopy = true;
  121. // all except WebGL
  122. // Do not change this back to using Application.platform
  123. // because that doesn't work in the Editor!
  124. public override bool Available() =>
  125. #if UNITY_WEBGL
  126. false;
  127. #else
  128. true;
  129. #endif
  130. protected override void ThreadedClientConnect(string address) => client.Connect(address, Port);
  131. protected override void ThreadedClientConnect(Uri uri)
  132. {
  133. if (uri.Scheme != Scheme)
  134. throw new ArgumentException($"Invalid url {uri}, use {Scheme}://host:port instead", nameof(uri));
  135. int serverPort = uri.IsDefaultPort ? Port : uri.Port;
  136. client.Connect(uri.Host, (ushort)serverPort);
  137. }
  138. protected override void ThreadedClientSend(ArraySegment<byte> segment, int channelId)
  139. {
  140. client.Send(segment, KcpTransport.ToKcpChannel(channelId));
  141. // thread safe version for statistics
  142. OnThreadedClientSend(segment, channelId);
  143. }
  144. protected override void ThreadedClientDisconnect() => client.Disconnect();
  145. // process incoming in early update
  146. protected override void ThreadedClientEarlyUpdate()
  147. {
  148. // only process messages while transport is enabled.
  149. // scene change messsages disable it to stop processing.
  150. // (see also: https://github.com/vis2k/Mirror/pull/379)
  151. // => enabledCopy for thread safe use
  152. if (enabledCopy) client.TickIncoming();
  153. }
  154. // process outgoing in late update
  155. protected override void ThreadedClientLateUpdate() => client.TickOutgoing();
  156. // server thread overrides
  157. public override Uri ServerUri()
  158. {
  159. UriBuilder builder = new UriBuilder();
  160. builder.Scheme = Scheme;
  161. builder.Host = Dns.GetHostName();
  162. builder.Port = Port;
  163. return builder.Uri;
  164. }
  165. protected override void ThreadedServerStart() => server.Start(Port);
  166. protected override void ThreadedServerSend(int connectionId, ArraySegment<byte> segment, int channelId)
  167. {
  168. server.Send(connectionId, segment, KcpTransport.ToKcpChannel(channelId));
  169. // thread safe version for statistics
  170. OnThreadedServerSend(connectionId, segment, channelId);
  171. }
  172. protected override void ThreadedServerDisconnect(int connectionId) => server.Disconnect(connectionId);
  173. /* NOT THREAD SAFE. ThreadedTransport version throws NotImplementedException for this.
  174. public override string ServerGetClientAddress(int connectionId)
  175. {
  176. IPEndPoint endPoint = server.GetClientEndPoint(connectionId);
  177. return endPoint != null
  178. // Map to IPv4 if "IsIPv4MappedToIPv6"
  179. // "::ffff:127.0.0.1" -> "127.0.0.1"
  180. ? (endPoint.Address.IsIPv4MappedToIPv6
  181. ? endPoint.Address.MapToIPv4().ToString()
  182. : endPoint.Address.ToString())
  183. : "";
  184. }
  185. */
  186. protected override void ThreadedServerStop() => server.Stop();
  187. protected override void ThreadedServerEarlyUpdate()
  188. {
  189. // only process messages while transport is enabled.
  190. // scene change messsages disable it to stop processing.
  191. // (see also: https://github.com/vis2k/Mirror/pull/379)
  192. // => enabledCopy for thread safe use
  193. if (enabledCopy) server.TickIncoming();
  194. }
  195. // process outgoing in late update
  196. protected override void ThreadedServerLateUpdate() => server.TickOutgoing();
  197. protected override void ThreadedShutdown() {}
  198. // max message size
  199. public override int GetMaxPacketSize(int channelId = Channels.Reliable)
  200. {
  201. // switch to kcp channel.
  202. // unreliable or reliable.
  203. // default to reliable just to be sure.
  204. switch (channelId)
  205. {
  206. case Channels.Unreliable:
  207. return KcpPeer.UnreliableMaxMessageSize(config.Mtu);
  208. default:
  209. return KcpPeer.ReliableMaxMessageSize(config.Mtu, ReceiveWindowSize);
  210. }
  211. }
  212. // kcp reliable channel max packet size is MTU * WND_RCV
  213. // this allows 144kb messages. but due to head of line blocking, all
  214. // other messages would have to wait until the maxed size one is
  215. // delivered. batching 144kb messages each time would be EXTREMELY slow
  216. // and fill the send queue nearly immediately when using it over the
  217. // network.
  218. // => instead we always use MTU sized batches.
  219. // => people can still send maxed size if needed.
  220. public override int GetBatchThreshold(int channelId) =>
  221. KcpPeer.UnreliableMaxMessageSize(config.Mtu);
  222. protected virtual void OnGUIStatistics()
  223. {
  224. // TODO not thread safe
  225. /*
  226. GUILayout.BeginArea(new Rect(5, 110, 300, 300));
  227. if (ServerActive())
  228. {
  229. GUILayout.BeginVertical("Box");
  230. GUILayout.Label("SERVER");
  231. GUILayout.Label($" connections: {server.connections.Count}");
  232. GUILayout.Label($" MaxSendRate (avg): {KcpTransport.PrettyBytes(GetAverageMaxSendRate())}/s");
  233. GUILayout.Label($" MaxRecvRate (avg): {KcpTransport.PrettyBytes(GetAverageMaxReceiveRate())}/s");
  234. GUILayout.Label($" SendQueue: {GetTotalSendQueue()}");
  235. GUILayout.Label($" ReceiveQueue: {GetTotalReceiveQueue()}");
  236. GUILayout.Label($" SendBuffer: {GetTotalSendBuffer()}");
  237. GUILayout.Label($" ReceiveBuffer: {GetTotalReceiveBuffer()}");
  238. GUILayout.EndVertical();
  239. }
  240. if (ClientConnected())
  241. {
  242. GUILayout.BeginVertical("Box");
  243. GUILayout.Label("CLIENT");
  244. GUILayout.Label($" MaxSendRate: {KcpTransport.PrettyBytes(client.peer.MaxSendRate)}/s");
  245. GUILayout.Label($" MaxRecvRate: {KcpTransport.PrettyBytes(client.peer.MaxReceiveRate)}/s");
  246. GUILayout.Label($" SendQueue: {client.peer.SendQueueCount}");
  247. GUILayout.Label($" ReceiveQueue: {client.peer.ReceiveQueueCount}");
  248. GUILayout.Label($" SendBuffer: {client.peer.SendBufferCount}");
  249. GUILayout.Label($" ReceiveBuffer: {client.peer.ReceiveBufferCount}");
  250. GUILayout.EndVertical();
  251. }
  252. GUILayout.EndArea();
  253. */
  254. }
  255. // OnGUI allocates even if it does nothing. avoid in release.
  256. #if UNITY_EDITOR || DEVELOPMENT_BUILD
  257. protected virtual void OnGUI()
  258. {
  259. if (statisticsGUI) OnGUIStatistics();
  260. }
  261. #endif
  262. protected virtual void OnLogStatistics()
  263. {
  264. // TODO not thread safe
  265. /*
  266. if (ServerActive())
  267. {
  268. string log = "kcp SERVER @ time: " + NetworkTime.localTime + "\n";
  269. log += $" connections: {server.connections.Count}\n";
  270. log += $" MaxSendRate (avg): {KcpTransport.PrettyBytes(GetAverageMaxSendRate())}/s\n";
  271. log += $" MaxRecvRate (avg): {KcpTransport.PrettyBytes(GetAverageMaxReceiveRate())}/s\n";
  272. log += $" SendQueue: {GetTotalSendQueue()}\n";
  273. log += $" ReceiveQueue: {GetTotalReceiveQueue()}\n";
  274. log += $" SendBuffer: {GetTotalSendBuffer()}\n";
  275. log += $" ReceiveBuffer: {GetTotalReceiveBuffer()}\n\n";
  276. Log.Info(log);
  277. }
  278. if (ClientConnected())
  279. {
  280. string log = "kcp CLIENT @ time: " + NetworkTime.localTime + "\n";
  281. log += $" MaxSendRate: {KcpTransport.PrettyBytes(client.peer.MaxSendRate)}/s\n";
  282. log += $" MaxRecvRate: {KcpTransport.PrettyBytes(client.peer.MaxReceiveRate)}/s\n";
  283. log += $" SendQueue: {client.peer.SendQueueCount}\n";
  284. log += $" ReceiveQueue: {client.peer.ReceiveQueueCount}\n";
  285. log += $" SendBuffer: {client.peer.SendBufferCount}\n";
  286. log += $" ReceiveBuffer: {client.peer.ReceiveBufferCount}\n\n";
  287. Log.Info(log);
  288. }
  289. */
  290. }
  291. public override string ToString() => $"ThreadedKCP {port}";
  292. }
  293. }
  294. //#endif MIRROR <- commented out because MIRROR isn't defined on first import yet