1
0
mirror of https://github.com/Ylianst/MeshAgent synced 2025-12-11 13:53:37 +00:00

Updated WebSocketStream, so when you call end on the decoded stream, it will call end on the encoded stream. This will cause all the references to detach, allowing timely garbage collection.

This commit is contained in:
Bryan Roe
2022-08-19 00:49:14 -07:00
parent 905aafd9b1
commit 36286d9e7d
2 changed files with 70 additions and 10 deletions

View File

@@ -4548,7 +4548,7 @@ ILibTransport_DoneState ILibDuktape_httpStream_webSocket_EncodedWriteSink(ILibDu
void ILibDuktape_httpStream_webSocket_EncodedEndSink(ILibDuktape_DuplexStream *stream, void *user)
{
ILibDuktape_WebSocket_State *state = (ILibDuktape_WebSocket_State*)user;
if (duk_ctx_shutting_down(state->ctx)) { return; }
if (!ILibMemory_CanaryOK(state) || duk_ctx_shutting_down(state->ctx)) { return; }
duk_push_heapptr(state->ctx, state->ObjectPtr); // [websocket]
duk_get_prop_string(state->ctx, -1, "decoded"); // [websocket][decoded]
@@ -4653,6 +4653,15 @@ void ILibDuktape_httpStream_webSocket_DecodedEndSink(ILibDuktape_DuplexStream *s
{
ILibDuktape_WebSocket_State *state = (ILibDuktape_WebSocket_State*)user;
ILibDuktape_httpStream_webSocket_WriteWebSocketPacket(state, WEBSOCKET_OPCODE_CLOSE, NULL, 0, ILibWebClient_WebSocket_FragmentFlag_Complete);
//
// We need to call 'end' on the encoded stream, so that it can disconnect
//
duk_push_heapptr(state->ctx, state->ObjectPtr); // [websocket]
duk_get_prop_string(state->ctx, -1, "encoded"); // [websocket][encoded]
duk_prepare_method_call(state->ctx, -1, "end"); // [websocket][encoded][end][this]
duk_pcall_method(state->ctx, 0); duk_pop(state->ctx); // [websocket][encoded]
duk_pop_2(state->ctx); // ...
}
void ILibDuktape_httpStream_webSocket_DecodedPauseSink_Chain(void *chain, void *user)
{
@@ -4678,7 +4687,7 @@ void ILibDuktape_httpStream_webSocket_DecodedPauseSink_Chain(void *chain, void *
void ILibDuktape_httpStream_webSocket_DecodedPauseSink(ILibDuktape_DuplexStream *sender, void *user)
{
ILibDuktape_WebSocket_State *state = (ILibDuktape_WebSocket_State*)user;
if (state == NULL || state->encodedStream == NULL || state->encodedStream->writableStream == NULL) { return; } // ERROR
if (state == NULL || !ILibMemory_CanaryOK(state) || state->encodedStream == NULL || state->encodedStream->writableStream == NULL) { return; } // ERROR
if (state->encodedStream->writableStream->pipedReadable_native != NULL && state->encodedStream->writableStream->pipedReadable_native->PauseHandler != NULL)
{

View File

@@ -16,7 +16,7 @@ limitations under the License.
*/
var duplex = require('stream').Duplex;
var promise = require('promise');
var http = require('http');
var http = require('https');
var processes = [];
//setModulePath('../modules');
@@ -33,7 +33,28 @@ var sample = new duplex(
{
}
});
var sample2 = new duplex(
{
'write': function (chunk, flush)
{
console.log(chunk.toString());
flush();
return (true);
},
'final': function (flush)
{
}
});
function sample_final()
{
console.log('Sample was finalized');
}
function sample2_final()
{
console.log('Sample2 was finalized');
}
sample.once('~', sample_final);
sample2.once('~', sample2_final);
if (process.platform == 'win32')
{
@@ -83,6 +104,9 @@ process.stdin.on('data', function (c)
if (c.toString() == null) { return; }
switch(c.toString().trim().toUpperCase())
{
case 'PIPE':
console.displayStreamPipeMessages = 1;
break;
case 'FINAL':
console.displayFinalizerMessages = 1;
break;
@@ -131,7 +155,9 @@ process.stdin.on('data', function (c)
console.log('Web Socket Server on port: ' + global.wsserver.address().port);
break;
case 'WSS4433':
global.wsserver = require('http').createServer();
console.log('Generating Cert...');
var cert = require('tls').generateCertificate('test', { certType: 2, noUsages: 1 });
global.wsserver = require('https').createServer({ pfx: cert, passphrase: 'test' });
global.wsserver.on('upgrade', wss_OnUpgrade);
global.wsserver.listen({ port: 4433 });
console.log('Web Socket Server on port: ' + global.wsserver.address().port);
@@ -141,12 +167,24 @@ process.stdin.on('data', function (c)
break;
case 'TUNEND':
global.tun.end();
//_debug();
global.tun.unpipe();
if (global.ipc) { global.ipc.unpipe(); }
global.tun = null;
global.ipc = null;
//sample.unpipe();
break;
case 'GC':
_debugGC();
break;
case 'SAMPLE':
sample.unpipe();
sample = null;
break;
case 'SAMPLEPIPE':
sample.pipe(sample2);
console.log('Sample Piped to Sample2');
break;
}
});
@@ -156,6 +194,7 @@ function wss_OnUpgrade(msg, sck, head)
{
case '/tunnel':
this.cws = sck.upgradeWebSocket();
this.cws.on('end', function () { console.log('Client WebSocket CLOSED'); });
console.log('Accepted Client WebSocket');
break;
}
@@ -175,16 +214,19 @@ function req_ws_upgrade(response, s, head)
global.tun = s;
_debug();
s.pipe(sample);
_debug();
//global.req = null;
////_debug();
//sample.pipe(s);
//s.pipe(sample);
////_debug();
////global.req = null;
}
function webSocketClientTest(port)
{
console.log('Initiating WebSocket');
var woptions = http.parseUri('ws://127.0.0.1:' + port + '/tunnel');
var woptions = http.parseUri('wss://127.0.0.1:' + port + '/tunnel');
woptions.rejectUnauthorized = 0;
var req = http.request(woptions);
req.on('upgrade', req_ws_upgrade);
req.once('~', req_finalized);
@@ -277,6 +319,14 @@ function startClient()
_debug();
});
this.on('data', function (c) { });
if(global.tun!=null)
{
global.ipc = this;
console.log('Piping Together Stuff');
global.tun.pipe(this);
this.pipe(global.tun);
}
});
ret.kill = function ()
{
@@ -320,6 +370,7 @@ function server_connection (c)
c.on('data', _data);
c.on('close', _close);
c.on('~', _f);
global.serverc = c;
}
function server_closed()