1
0
mirror of https://github.com/Ylianst/MeshCentralRouter synced 2025-12-06 00:13:33 +00:00

Native web socket improvements.

This commit is contained in:
Ylian Saint-Hilaire
2021-10-30 12:07:54 -07:00
parent 0bcb7a0164
commit 4f2401a796
2 changed files with 67 additions and 34 deletions

View File

@@ -20,7 +20,6 @@ using System.Windows.Forms;
using System.Collections.Generic;
using System.Security.Cryptography;
using System.Web.Script.Serialization;
using Microsoft.Win32;
using System.Threading;
namespace MeshCentralRouter

View File

@@ -66,12 +66,12 @@ namespace MeshCentralRouter
private System.Threading.Timer pongTimer = null;
private bool pendingSendCall = false;
private MemoryStream pendingSendBuffer = null;
public long PendingSendLength { get { return (pendingSendBuffer == null)? 0 : pendingSendBuffer.Length; } }
private bool readPaused = false;
private bool shouldRead = false;
private RNGCryptoServiceProvider CryptoRandom = new RNGCryptoServiceProvider();
private object mainLock = new object();
public TLSCertificateCheck TLSCertCheck = TLSCertificateCheck.Verify;
public X509Certificate2 tlsCert = null;
public X509Certificate2 failedTlsCert = null;
static public bool nativeWebSocketFirst = true;
private SemaphoreSlim receiveLock = new SemaphoreSlim(1, 1);
@@ -102,6 +102,8 @@ namespace MeshCentralRouter
NoError = 0
}
public long PendingSendLength { get { if (ws != null) { lock (pendingSends) { return pendingSends.Count; } } else { return (pendingSendBuffer == null) ? 0 : pendingSendBuffer.Length; } } }
private void TlsDump(string direction, byte[] data, int offset, int len) { if (tlsdump) { try { File.AppendAllText("debug.log", direction + ": " + BitConverter.ToString(data, offset, len).Replace("-", string.Empty) + "\r\n"); } catch (Exception) { } } }
public delegate void onBinaryDataHandler(webSocketClient sender, byte[] data, int offset, int length, int orglen);
@@ -117,7 +119,7 @@ namespace MeshCentralRouter
public ConnectionStates State { get { return state; } }
public X509Certificate RemoteCertificate { get { try { return wsstream.RemoteCertificate; } catch (Exception) { return null; } } }
public X509Certificate RemoteCertificate { get { if (tlsCert != null) return tlsCert; try { return wsstream.RemoteCertificate; } catch (Exception) { return null; } } }
private void SetState(ConnectionStates newstate)
{
@@ -320,7 +322,7 @@ namespace MeshCentralRouter
else
{
// Read more proxy data
try { wsrawstream.BeginRead(readBuffer, readBufferLen, readBuffer.Length - readBufferLen, new AsyncCallback(OnProxyResponseSink), this); } catch (Exception) { Dispose(); }
try { wsrawstream.BeginRead(readBuffer, readBufferLen, readBuffer.Length - readBufferLen, new AsyncCallback(OnProxyResponseSink), this); } catch (Exception) { Dispose(); }
}
}
}
@@ -367,9 +369,12 @@ namespace MeshCentralRouter
// Send the HTTP headers
Log("Websocket TLS setup, sending HTTP header...");
string header;
if (AllowCompression) {
if (AllowCompression)
{
header = "GET " + url.PathAndQuery + " HTTP/1.1\r\nHost: " + url.Host + "\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\nSec-WebSocket-Version: 13\r\nSec-WebSocket-Extensions: permessage-deflate; client_no_context_takeover\r\n" + extraHeadersStr + "\r\n";
} else {
}
else
{
header = "GET " + url.PathAndQuery + " HTTP/1.1\r\nHost: " + url.Host + "\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\nSec-WebSocket-Version: 13\r\n" + extraHeadersStr + "\r\n";
}
SendData(UTF8Encoding.UTF8.GetBytes(header));
@@ -556,15 +561,18 @@ namespace MeshCentralRouter
if (((op & 0x40) != 0) && (inflateMemory != null))
{
// This is a deflate compressed frame
inflateMemory.SetLength(0);
inflateMemory.Write(data, offset, len);
inflateMemory.Write(inflateEnd, 0, 4);
inflateMemory.Seek(0, SeekOrigin.Begin);
MemoryStream memoryStream = new MemoryStream();
inflate.CopyTo(memoryStream);
data = memoryStream.GetBuffer();
offset = 0;
len = (int)memoryStream.Length;
lock (inflateMemory)
{
inflateMemory.SetLength(0);
inflateMemory.Write(data, offset, len);
inflateMemory.Write(inflateEnd, 0, 4);
inflateMemory.Seek(0, SeekOrigin.Begin);
MemoryStream memoryStream = new MemoryStream();
inflate.CopyTo(memoryStream);
data = memoryStream.GetBuffer();
offset = 0;
len = (int)memoryStream.Length;
}
}
switch (op & 0x0F)
@@ -678,30 +686,31 @@ namespace MeshCentralRouter
if (state != ConnectionStates.Connected) return 0;
Log("WebSocketClient-SEND-String: " + data);
byte[] buf = UTF8Encoding.UTF8.GetBytes(data);
return SendFragment(buf, 0, buf.Length, 129);
return SendFragment(buf, 0, buf.Length, 129, true);
}
public int SendBinary(byte[] data)
{
Log("WebSocketClient-SEND-Binary-Len:" + data.Length);
return SendFragment(data, 0, data.Length, 130);
return SendFragment(data, 0, data.Length, 130, false);
}
public int SendBinary(byte[] data, int offset, int len) {
public int SendBinary(byte[] data, int offset, int len)
{
Log("WebSocketClient-SEND-Binary-Len:" + len);
return SendFragment(data, offset, len, 130);
return SendFragment(data, offset, len, 130, false);
}
public int SendPing(byte[] data, int offset, int len)
{
Log("WebSocketClient-SEND-Ping");
return SendFragment(null, 0, 0, 137);
return SendFragment(null, 0, 0, 137, true);
}
public int SendPong(byte[] data, int offset, int len)
{
Log("WebSocketClient-SEND-Pong");
return SendFragment(null, 0, 0, 138);
return SendFragment(null, 0, 0, 138, true);
}
// This controls the flow of fragments being sent, queuing send operations if needed
@@ -717,18 +726,30 @@ namespace MeshCentralRouter
}
// Fragment op code (129 = text, 130 = binary)
public int SendFragment(byte[] data, int offset, int len, byte op)
// ownershipReleased = True is the memory in data is not owned anymore.
private int SendFragment(byte[] data, int offset, int len, byte op, bool ownershipReleased)
{
TlsDump("Out(" + op + ")", data, offset, len);
if (ws != null)
{
if ((data == null) || (len == 0)) return 0;
if (ownershipReleased == false)
{
// Since this is going into a aynsc send or in a queue, copy the outgoing data into a new buffer.
byte[] buf = new byte[len];
Array.Copy(data, offset, buf, 0, len);
data = buf;
offset = 0;
}
// Using native websocket
lock (pendingSends)
{
if (pendingSend != null)
{
// A send operating is already being processes, queue this send.
pendingSends.Add(new pendingSendClass(data, offset, len, op));
pendingSends.Add(new pendingSendClass(data, 0, len, op));
}
else
{
@@ -753,17 +774,17 @@ namespace MeshCentralRouter
if ((deflateMemory != null) && (len > 32) && (AllowCompression))
{
deflateMemory.SetLength(0);
deflateMemory.Write(inflateStart, 0, 14);
DeflateStream deflate = new DeflateStream(deflateMemory, CompressionMode.Compress, true);
deflate.Write(data, offset, len);
deflate.Dispose();
deflate = null;
if (deflateMemory.Length < len)
{
// Use the compressed data
// Copy to a new buffer, this is needed because we do async send operation
int newlen = (int)deflateMemory.Length;
buf = deflateMemory.GetBuffer();
len = newlen - 14;
buf = new byte[14 + newlen];
Array.Copy(deflateMemory.GetBuffer(), 0, buf, 14, newlen);
len = newlen;
op |= 0x40; // Add compression op
}
else
@@ -776,7 +797,7 @@ namespace MeshCentralRouter
}
else
{
// Convert the string into a buffer with 4 byte of header space.
// Convert the string into a buffer with 14 bytea of header space.
buf = new byte[14 + len];
if (len > 0) { Array.Copy(data, offset, buf, 14, len); }
}
@@ -877,9 +898,12 @@ namespace MeshCentralRouter
bool IsSuccess = Win32Api.WinHttpGetProxyForUrl(WinHttpSession, DestinationUrl, ref ProxyOptions, ref ProxyInfo);
Win32Api.WinHttpCloseHandle(WinHttpSession);
if (IsSuccess) {
if (IsSuccess)
{
return ProxyInfo.lpszProxy;
} else {
}
else
{
Console.WriteLine("Error: {0}", Win32Api.GetLastError());
return null;
}
@@ -916,7 +940,7 @@ namespace MeshCentralRouter
}
}
private async Task ReceiveLoop()
private void ReceiveLoop()
{
SetState(ConnectionStates.Connected);
var loopToken = CTS.Token;
@@ -931,9 +955,19 @@ namespace MeshCentralRouter
outputStream = new MemoryStream(8192);
do
{
receiveResult = await ws.ReceiveAsync(bufferEx, CTS.Token);
if (receiveResult.MessageType != WebSocketMessageType.Close)
outputStream.Write(buffer, 0, receiveResult.Count);
try
{
Task<WebSocketReceiveResult> t = ws.ReceiveAsync(bufferEx, CTS.Token);
t.Wait();
receiveResult = t.Result;
if (receiveResult.MessageType != WebSocketMessageType.Close) { outputStream.Write(buffer, 0, receiveResult.Count); }
}
catch (Exception)
{
outputStream?.Dispose();
SetState(0);
return;
}
}
while (!receiveResult.EndOfMessage);
if (receiveResult.MessageType == WebSocketMessageType.Close) break;