Skip to content

Commit d502d9c

Browse files
committed
perf improvements
1 parent e57991c commit d502d9c

3 files changed

Lines changed: 117 additions & 38 deletions

File tree

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"name": "colyseus.nativewebsocket"
3+
}

integrations/Unity/endel.nativewebsocket.asmdef

Lines changed: 0 additions & 3 deletions
This file was deleted.

src/NativeWebSocket/WebSocket.cs

Lines changed: 114 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ public class WebSocket : IWebSocket
2828
private List<Action> m_DispatchQueue = new List<Action>();
2929
private readonly object EventQueueLock = new object();
3030

31+
private List<byte[]> m_MessageQueue = new List<byte[]>();
32+
private List<byte[]> m_MessageDispatchQueue = new List<byte[]>();
33+
private readonly object MessageQueueLock = new object();
34+
3135
private readonly object OutgoingMessageLock = new object();
3236
private bool isSending = false;
3337
private Queue<ArraySegment<byte>> sendBytesQueue = new Queue<ArraySegment<byte>>();
@@ -84,12 +88,46 @@ private void EnqueueEvent(Action action)
8488
}
8589
}
8690

91+
private void EnqueueMessage(byte[] data)
92+
{
93+
if (_syncContext != null)
94+
{
95+
_syncContext.Post(_ => OnMessage?.Invoke(data), null);
96+
}
97+
else
98+
{
99+
lock (MessageQueueLock)
100+
{
101+
m_MessageQueue.Add(data);
102+
}
103+
}
104+
}
105+
87106
/// <summary>
88107
/// Dispatches queued events when no SynchronizationContext is available.
89108
/// Not needed when a SynchronizationContext is present (Unity, Godot, MonoGame with WebSocketGameComponent).
90109
/// </summary>
91110
public void DispatchMessageQueue()
92111
{
112+
// Hot path: dispatch messages without closure overhead
113+
if (m_MessageQueue.Count > 0)
114+
{
115+
lock (MessageQueueLock)
116+
{
117+
var tmp = m_MessageDispatchQueue;
118+
m_MessageDispatchQueue = m_MessageQueue;
119+
m_MessageQueue = tmp;
120+
}
121+
122+
for (int i = 0; i < m_MessageDispatchQueue.Count; i++)
123+
{
124+
OnMessage?.Invoke(m_MessageDispatchQueue[i]);
125+
}
126+
127+
m_MessageDispatchQueue.Clear();
128+
}
129+
130+
// Rare events: OnOpen, OnError, OnClose
93131
if (m_EventQueue.Count == 0) return;
94132

95133
lock (EventQueueLock)
@@ -179,75 +217,116 @@ public WebSocketState State
179217

180218
public Task Send(byte[] bytes)
181219
{
182-
return SendMessage(sendBytesQueue, WebSocketMessageType.Binary, new ArraySegment<byte>(bytes));
220+
if (bytes.Length == 0 || State != WebSocketState.Open)
221+
return Task.CompletedTask;
222+
223+
var segment = new ArraySegment<byte>(bytes);
224+
225+
lock (OutgoingMessageLock)
226+
{
227+
if (isSending)
228+
{
229+
sendBytesQueue.Enqueue(segment);
230+
return Task.CompletedTask;
231+
}
232+
isSending = true;
233+
}
234+
235+
return SendAndDrainAsync(sendBytesQueue, WebSocketMessageType.Binary, segment);
183236
}
184237

185238
public Task SendText(string message)
186239
{
240+
if (State != WebSocketState.Open)
241+
return Task.CompletedTask;
242+
187243
var encoded = Encoding.UTF8.GetBytes(message);
188-
return SendMessage(sendTextQueue, WebSocketMessageType.Text, new ArraySegment<byte>(encoded, 0, encoded.Length));
244+
if (encoded.Length == 0)
245+
return Task.CompletedTask;
246+
247+
var segment = new ArraySegment<byte>(encoded, 0, encoded.Length);
248+
249+
lock (OutgoingMessageLock)
250+
{
251+
if (isSending)
252+
{
253+
sendTextQueue.Enqueue(segment);
254+
return Task.CompletedTask;
255+
}
256+
isSending = true;
257+
}
258+
259+
return SendAndDrainAsync(sendTextQueue, WebSocketMessageType.Text, segment);
189260
}
190261

191-
private async Task SendMessage(Queue<ArraySegment<byte>> queue, WebSocketMessageType messageType, ArraySegment<byte> buffer)
262+
private Task SendAndDrainAsync(Queue<ArraySegment<byte>> queue, WebSocketMessageType messageType, ArraySegment<byte> buffer)
192263
{
193-
if (buffer.Count == 0 || State != WebSocketState.Open)
264+
// Try synchronous fast path: avoid async state machine when SendAsync completes inline
265+
var sendTask = m_Socket.SendAsync(buffer, messageType, true, m_CancellationToken);
266+
if (sendTask.Status == TaskStatus.RanToCompletion)
194267
{
195-
return;
268+
return DrainQueueSync(queue, messageType);
196269
}
197270

198-
bool sending;
271+
return AwaitAndDrainAsync(sendTask, queue, messageType);
272+
}
199273

200-
lock (OutgoingMessageLock)
274+
private Task DrainQueueSync(Queue<ArraySegment<byte>> queue, WebSocketMessageType messageType)
275+
{
276+
while (true)
201277
{
202-
sending = isSending;
278+
ArraySegment<byte> next;
279+
lock (OutgoingMessageLock)
280+
{
281+
if (queue.Count == 0)
282+
{
283+
isSending = false;
284+
return Task.CompletedTask;
285+
}
286+
next = queue.Dequeue();
287+
}
203288

204-
if (!isSending)
289+
var sendTask = m_Socket.SendAsync(next, messageType, true, m_CancellationToken);
290+
if (sendTask.Status != TaskStatus.RanToCompletion)
205291
{
206-
isSending = true;
292+
// This send went async — fall through to async drain
293+
return AwaitAndDrainAsync(sendTask, queue, messageType);
207294
}
208295
}
296+
}
209297

210-
if (!sending)
298+
private async Task AwaitAndDrainAsync(Task pendingSend, Queue<ArraySegment<byte>> queue, WebSocketMessageType messageType)
299+
{
300+
try
211301
{
212-
try
213-
{
214-
await m_Socket.SendAsync(buffer, messageType, true, m_CancellationToken).ConfigureAwait(false);
302+
await pendingSend.ConfigureAwait(false);
215303

216-
// Drain the queue iteratively instead of recursively
217-
while (true)
218-
{
219-
ArraySegment<byte> next;
220-
lock (OutgoingMessageLock)
221-
{
222-
if (queue.Count == 0)
223-
break;
224-
next = queue.Dequeue();
225-
}
226-
227-
await m_Socket.SendAsync(next, messageType, true, m_CancellationToken).ConfigureAwait(false);
228-
}
229-
}
230-
finally
304+
while (true)
231305
{
306+
ArraySegment<byte> next;
232307
lock (OutgoingMessageLock)
233308
{
234-
isSending = false;
309+
if (queue.Count == 0)
310+
break;
311+
next = queue.Dequeue();
235312
}
313+
314+
await m_Socket.SendAsync(next, messageType, true, m_CancellationToken).ConfigureAwait(false);
236315
}
237316
}
238-
else
317+
finally
239318
{
240319
lock (OutgoingMessageLock)
241320
{
242-
queue.Enqueue(buffer);
321+
isSending = false;
243322
}
244323
}
245324
}
246325

247326
public async Task Receive()
248327
{
249328
WebSocketCloseCode closeCode = WebSocketCloseCode.Abnormal;
250-
ArraySegment<byte> buffer = new ArraySegment<byte>(new byte[8192]);
329+
ArraySegment<byte> buffer = new ArraySegment<byte>(new byte[65536]);
251330
try
252331
{
253332
while (m_Socket.State == System.Net.WebSockets.WebSocketState.Open)
@@ -266,7 +345,7 @@ public async Task Receive()
266345
// Fast path: single-frame message, avoid MemoryStream
267346
var data = new byte[result.Count];
268347
Buffer.BlockCopy(buffer.Array, buffer.Offset, data, 0, result.Count);
269-
EnqueueEvent(() => OnMessage?.Invoke(data));
348+
EnqueueMessage(data);
270349
}
271350
else
272351
{
@@ -283,7 +362,7 @@ public async Task Receive()
283362
while (!result.EndOfMessage);
284363

285364
var data = ms.ToArray();
286-
EnqueueEvent(() => OnMessage?.Invoke(data));
365+
EnqueueMessage(data);
287366
}
288367
}
289368
}

0 commit comments

Comments
 (0)