123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362 |
- using System;
- using System.Net.Sockets;
- using System.Threading;
- namespace Telepathy
- {
- // ClientState OBJECT that can be handed to the ReceiveThread safely.
- // => allows us to create a NEW OBJECT every time we connect and start a
- // receive thread.
- // => perfectly protects us against data races. fixes all the flaky tests
- // where .Connecting or .client would still be used by a dieing thread
- // while attempting to use it for a new connection attempt etc.
- // => creating a fresh client state each time is the best solution against
- // data races here!
- class ClientConnectionState : ConnectionState
- {
- public Thread receiveThread;
- // TcpClient.Connected doesn't check if socket != null, which
- // results in NullReferenceExceptions if connection was closed.
- // -> let's check it manually instead
- public bool Connected => client != null &&
- client.Client != null &&
- client.Client.Connected;
- // TcpClient has no 'connecting' state to check. We need to keep track
- // of it manually.
- // -> checking 'thread.IsAlive && !Connected' is not enough because the
- // thread is alive and connected is false for a short moment after
- // disconnecting, so this would cause race conditions.
- // -> we use a threadsafe bool wrapper so that ThreadFunction can remain
- // static (it needs a common lock)
- // => Connecting is true from first Connect() call in here, through the
- // thread start, until TcpClient.Connect() returns. Simple and clear.
- // => bools are atomic according to
- // https://docs.microsoft.com/en-us/dotnet/csharp/language-reference/language-specification/variables
- // made volatile so the compiler does not reorder access to it
- public volatile bool Connecting;
- // thread safe pipe for received messages
- // => inside client connection state so that we can create a new state
- // each time we connect
- // (unlike server which has one receive pipe for all connections)
- public readonly MagnificentReceivePipe receivePipe;
- // constructor always creates new TcpClient for client connection!
- public ClientConnectionState(int MaxMessageSize) : base(new TcpClient(), MaxMessageSize)
- {
- // create receive pipe with max message size for pooling
- receivePipe = new MagnificentReceivePipe(MaxMessageSize);
- }
- // dispose all the state safely
- public void Dispose()
- {
- // close client
- client.Close();
- // wait until thread finished. this is the only way to guarantee
- // that we can call Connect() again immediately after Disconnect
- // -> calling .Join would sometimes wait forever, e.g. when
- // calling Disconnect while trying to connect to a dead end
- receiveThread?.Interrupt();
- // we interrupted the receive Thread, so we can't guarantee that
- // connecting was reset. let's do it manually.
- Connecting = false;
- // clear send pipe. no need to hold on to elements.
- // (unlike receiveQueue, which is still needed to process the
- // latest Disconnected message, etc.)
- sendPipe.Clear();
- // IMPORTANT: DO NOT CLEAR RECEIVE PIPE.
- // we still want to process disconnect messages in Tick()!
- // let go of this client completely. the thread ended, no one uses
- // it anymore and this way Connected is false again immediately.
- client = null;
- }
- }
- public class Client : Common
- {
- // events to hook into
- // => OnData uses ArraySegment for allocation free receives later
- public Action OnConnected;
- public Action<ArraySegment<byte>> OnData;
- public Action OnDisconnected;
- // disconnect if send queue gets too big.
- // -> avoids ever growing queue memory if network is slower than input
- // -> disconnecting is great for load balancing. better to disconnect
- // one connection than risking every connection / the whole server
- // -> huge queue would introduce multiple seconds of latency anyway
- //
- // Mirror/DOTSNET use MaxMessageSize batching, so for a 16kb max size:
- // limit = 1,000 means 16 MB of memory/connection
- // limit = 10,000 means 160 MB of memory/connection
- public int SendQueueLimit = 10000;
- public int ReceiveQueueLimit = 10000;
- // all client state wrapped into an object that is passed to ReceiveThread
- // => we create a new one each time we connect to avoid data races with
- // old dieing threads still using the previous object!
- ClientConnectionState state;
- // Connected & Connecting
- public bool Connected => state != null && state.Connected;
- public bool Connecting => state != null && state.Connecting;
- // pipe count, useful for debugging / benchmarks
- public int ReceivePipeCount => state != null ? state.receivePipe.TotalCount : 0;
- // constructor
- public Client(int MaxMessageSize) : base(MaxMessageSize) {}
- // the thread function
- // STATIC to avoid sharing state!
- // => pass ClientState object. a new one is created for each new thread!
- // => avoids data races where an old dieing thread might still modify
- // the current thread's state :/
- static void ReceiveThreadFunction(ClientConnectionState state, string ip, int port, int MaxMessageSize, bool NoDelay, int SendTimeout, int ReceiveTimeout, int ReceiveQueueLimit)
- {
- Thread sendThread = null;
- // absolutely must wrap with try/catch, otherwise thread
- // exceptions are silent
- try
- {
- // connect (blocking)
- state.client.Connect(ip, port);
- state.Connecting = false; // volatile!
- // set socket options after the socket was created in Connect()
- // (not after the constructor because we clear the socket there)
- state.client.NoDelay = NoDelay;
- state.client.SendTimeout = SendTimeout;
- state.client.ReceiveTimeout = ReceiveTimeout;
- // start send thread only after connected
- // IMPORTANT: DO NOT SHARE STATE ACROSS MULTIPLE THREADS!
- sendThread = new Thread(() => { ThreadFunctions.SendLoop(0, state.client, state.sendPipe, state.sendPending); });
- sendThread.IsBackground = true;
- sendThread.Start();
- // run the receive loop
- // (receive pipe is shared across all loops)
- ThreadFunctions.ReceiveLoop(0, state.client, MaxMessageSize, state.receivePipe, ReceiveQueueLimit);
- }
- catch (SocketException exception)
- {
- // this happens if (for example) the ip address is correct
- // but there is no server running on that ip/port
- Log.Info("Client Recv: failed to connect to ip=" + ip + " port=" + port + " reason=" + exception);
- // add 'Disconnected' event to receive pipe so that the caller
- // knows that the Connect failed. otherwise they will never know
- state.receivePipe.Enqueue(0, EventType.Disconnected, default);
- }
- catch (ThreadInterruptedException)
- {
- // expected if Disconnect() aborts it
- }
- catch (ThreadAbortException)
- {
- // expected if Disconnect() aborts it
- }
- catch (ObjectDisposedException)
- {
- // expected if Disconnect() aborts it and disposed the client
- // while ReceiveThread is in a blocking Connect() call
- }
- catch (Exception exception)
- {
- // something went wrong. probably important.
- Log.Error("Client Recv Exception: " + exception);
- }
- // sendthread might be waiting on ManualResetEvent,
- // so let's make sure to end it if the connection
- // closed.
- // otherwise the send thread would only end if it's
- // actually sending data while the connection is
- // closed.
- sendThread?.Interrupt();
- // Connect might have failed. thread might have been closed.
- // let's reset connecting state no matter what.
- state.Connecting = false;
- // if we got here then we are done. ReceiveLoop cleans up already,
- // but we may never get there if connect fails. so let's clean up
- // here too.
- state.client?.Close();
- }
- public void Connect(string ip, int port)
- {
- // not if already started
- if (Connecting || Connected)
- {
- Log.Warning("Telepathy Client can not create connection because an existing connection is connecting or connected");
- return;
- }
- // overwrite old thread's state object. create a new one to avoid
- // data races where an old dieing thread might still modify the
- // current state! fixes all the flaky tests!
- state = new ClientConnectionState(MaxMessageSize);
- // We are connecting from now until Connect succeeds or fails
- state.Connecting = true;
- // create a TcpClient with perfect IPv4, IPv6 and hostname resolving
- // support.
- //
- // * TcpClient(hostname, port): works but would connect (and block)
- // already
- // * TcpClient(AddressFamily.InterNetworkV6): takes Ipv4 and IPv6
- // addresses but only connects to IPv6 servers (e.g. Telepathy).
- // does NOT connect to IPv4 servers (e.g. Mirror Booster), even
- // with DualMode enabled.
- // * TcpClient(): creates IPv4 socket internally, which would force
- // Connect() to only use IPv4 sockets.
- //
- // => the trick is to clear the internal IPv4 socket so that Connect
- // resolves the hostname and creates either an IPv4 or an IPv6
- // socket as needed (see TcpClient source)
- state.client.Client = null; // clear internal IPv4 socket until Connect()
- // client.Connect(ip, port) is blocking. let's call it in the thread
- // and return immediately.
- // -> this way the application doesn't hang for 30s if connect takes
- // too long, which is especially good in games
- // -> this way we don't async client.BeginConnect, which seems to
- // fail sometimes if we connect too many clients too fast
- state.receiveThread = new Thread(() => {
- ReceiveThreadFunction(state, ip, port, MaxMessageSize, NoDelay, SendTimeout, ReceiveTimeout, ReceiveQueueLimit);
- });
- state.receiveThread.IsBackground = true;
- state.receiveThread.Start();
- }
- public void Disconnect()
- {
- // only if started
- if (Connecting || Connected)
- {
- // dispose all the state safely
- state.Dispose();
- // IMPORTANT: DO NOT set state = null!
- // we still want to process the pipe's disconnect message etc.!
- }
- }
- // send message to server using socket connection.
- // arraysegment for allocation free sends later.
- // -> the segment's array is only used until Send() returns!
- public bool Send(ArraySegment<byte> message)
- {
- if (Connected)
- {
- // respect max message size to avoid allocation attacks.
- if (message.Count <= MaxMessageSize)
- {
- // check send pipe limit
- if (state.sendPipe.Count < SendQueueLimit)
- {
- // add to thread safe send pipe and return immediately.
- // calling Send here would be blocking (sometimes for long
- // times if other side lags or wire was disconnected)
- state.sendPipe.Enqueue(message);
- state.sendPending.Set(); // interrupt SendThread WaitOne()
- return true;
- }
- // disconnect if send queue gets too big.
- // -> avoids ever growing queue memory if network is slower
- // than input
- // -> avoids ever growing latency as well
- //
- // note: while SendThread always grabs the WHOLE send queue
- // immediately, it's still possible that the sending
- // blocks for so long that the send queue just gets
- // way too big. have a limit - better safe than sorry.
- else
- {
- // log the reason
- Log.Warning($"Client.Send: sendPipe reached limit of {SendQueueLimit}. This can happen if we call send faster than the network can process messages. Disconnecting to avoid ever growing memory & latency.");
- // just close it. send thread will take care of the rest.
- state.client.Close();
- return false;
- }
- }
- Log.Error("Client.Send: message too big: " + message.Count + ". Limit: " + MaxMessageSize);
- return false;
- }
- Log.Warning("Client.Send: not connected!");
- return false;
- }
- // tick: processes up to 'limit' messages
- // => limit parameter to avoid deadlocks / too long freezes if server or
- // client is too slow to process network load
- // => Mirror & DOTSNET need to have a process limit anyway.
- // might as well do it here and make life easier.
- // => returns amount of remaining messages to process, so the caller
- // can call tick again as many times as needed (or up to a limit)
- //
- // Tick() may process multiple messages, but Mirror needs a way to stop
- // processing immediately if a scene change messages arrives. Mirror
- // can't process any other messages during a scene change.
- // (could be useful for others too)
- // => make sure to allocate the lambda only once in transports
- public int Tick(int processLimit, Func<bool> checkEnabled = null)
- {
- // only if state was created yet (after connect())
- // note: we don't check 'only if connected' because we want to still
- // process Disconnect messages afterwards too!
- if (state == null)
- return 0;
- // process up to 'processLimit' messages
- for (int i = 0; i < processLimit; ++i)
- {
- // check enabled in case a Mirror scene message arrived
- if (checkEnabled != null && !checkEnabled())
- break;
- // peek first. allows us to process the first queued entry while
- // still keeping the pooled byte[] alive by not removing anything.
- if (state.receivePipe.TryPeek(out int _, out EventType eventType, out ArraySegment<byte> message))
- {
- switch (eventType)
- {
- case EventType.Connected:
- OnConnected?.Invoke();
- break;
- case EventType.Data:
- OnData?.Invoke(message);
- break;
- case EventType.Disconnected:
- OnDisconnected?.Invoke();
- break;
- }
- // IMPORTANT: now dequeue and return it to pool AFTER we are
- // done processing the event.
- state.receivePipe.TryDequeue();
- }
- // no more messages. stop the loop.
- else break;
- }
- // return what's left to process for next time
- return state.receivePipe.TotalCount;
- }
- }
- }
|