From 36286d9e7d03750ee884e868e80aade55eccafee Mon Sep 17 00:00:00 2001 From: Bryan Roe Date: Fri, 19 Aug 2022 00:49:14 -0700 Subject: [PATCH] Updated WebSocketStream, so when you call end on the decoded stream, it will call end on the encoded stream. This will cause all the references to detach, allowing timely garbage collection. --- microscript/ILibDuktape_HttpStream.c | 13 +++++- test/leaktest.js | 67 ++++++++++++++++++++++++---- 2 files changed, 70 insertions(+), 10 deletions(-) diff --git a/microscript/ILibDuktape_HttpStream.c b/microscript/ILibDuktape_HttpStream.c index e46e613..9b79e37 100644 --- a/microscript/ILibDuktape_HttpStream.c +++ b/microscript/ILibDuktape_HttpStream.c @@ -4548,7 +4548,7 @@ ILibTransport_DoneState ILibDuktape_httpStream_webSocket_EncodedWriteSink(ILibDu void ILibDuktape_httpStream_webSocket_EncodedEndSink(ILibDuktape_DuplexStream *stream, void *user) { ILibDuktape_WebSocket_State *state = (ILibDuktape_WebSocket_State*)user; - if (duk_ctx_shutting_down(state->ctx)) { return; } + if (!ILibMemory_CanaryOK(state) || duk_ctx_shutting_down(state->ctx)) { return; } duk_push_heapptr(state->ctx, state->ObjectPtr); // [websocket] duk_get_prop_string(state->ctx, -1, "decoded"); // [websocket][decoded] @@ -4653,6 +4653,15 @@ void ILibDuktape_httpStream_webSocket_DecodedEndSink(ILibDuktape_DuplexStream *s { ILibDuktape_WebSocket_State *state = (ILibDuktape_WebSocket_State*)user; ILibDuktape_httpStream_webSocket_WriteWebSocketPacket(state, WEBSOCKET_OPCODE_CLOSE, NULL, 0, ILibWebClient_WebSocket_FragmentFlag_Complete); + + // + // We need to call 'end' on the encoded stream, so that it can disconnect + // + duk_push_heapptr(state->ctx, state->ObjectPtr); // [websocket] + duk_get_prop_string(state->ctx, -1, "encoded"); // [websocket][encoded] + duk_prepare_method_call(state->ctx, -1, "end"); // [websocket][encoded][end][this] + duk_pcall_method(state->ctx, 0); duk_pop(state->ctx); // [websocket][encoded] + duk_pop_2(state->ctx); // ... } void ILibDuktape_httpStream_webSocket_DecodedPauseSink_Chain(void *chain, void *user) { @@ -4678,7 +4687,7 @@ void ILibDuktape_httpStream_webSocket_DecodedPauseSink_Chain(void *chain, void * void ILibDuktape_httpStream_webSocket_DecodedPauseSink(ILibDuktape_DuplexStream *sender, void *user) { ILibDuktape_WebSocket_State *state = (ILibDuktape_WebSocket_State*)user; - if (state == NULL || state->encodedStream == NULL || state->encodedStream->writableStream == NULL) { return; } // ERROR + if (state == NULL || !ILibMemory_CanaryOK(state) || state->encodedStream == NULL || state->encodedStream->writableStream == NULL) { return; } // ERROR if (state->encodedStream->writableStream->pipedReadable_native != NULL && state->encodedStream->writableStream->pipedReadable_native->PauseHandler != NULL) { diff --git a/test/leaktest.js b/test/leaktest.js index 3ff51fe..9e2a044 100644 --- a/test/leaktest.js +++ b/test/leaktest.js @@ -16,7 +16,7 @@ limitations under the License. */ var duplex = require('stream').Duplex; var promise = require('promise'); -var http = require('http'); +var http = require('https'); var processes = []; //setModulePath('../modules'); @@ -33,7 +33,28 @@ var sample = new duplex( { } }); - +var sample2 = new duplex( + { + 'write': function (chunk, flush) + { + console.log(chunk.toString()); + flush(); + return (true); + }, + 'final': function (flush) + { + } + }); +function sample_final() +{ + console.log('Sample was finalized'); +} +function sample2_final() +{ + console.log('Sample2 was finalized'); +} +sample.once('~', sample_final); +sample2.once('~', sample2_final); if (process.platform == 'win32') { @@ -83,6 +104,9 @@ process.stdin.on('data', function (c) if (c.toString() == null) { return; } switch(c.toString().trim().toUpperCase()) { + case 'PIPE': + console.displayStreamPipeMessages = 1; + break; case 'FINAL': console.displayFinalizerMessages = 1; break; @@ -131,7 +155,9 @@ process.stdin.on('data', function (c) console.log('Web Socket Server on port: ' + global.wsserver.address().port); break; case 'WSS4433': - global.wsserver = require('http').createServer(); + console.log('Generating Cert...'); + var cert = require('tls').generateCertificate('test', { certType: 2, noUsages: 1 }); + global.wsserver = require('https').createServer({ pfx: cert, passphrase: 'test' }); global.wsserver.on('upgrade', wss_OnUpgrade); global.wsserver.listen({ port: 4433 }); console.log('Web Socket Server on port: ' + global.wsserver.address().port); @@ -141,12 +167,24 @@ process.stdin.on('data', function (c) break; case 'TUNEND': global.tun.end(); + //_debug(); global.tun.unpipe(); + if (global.ipc) { global.ipc.unpipe(); } global.tun = null; + global.ipc = null; + //sample.unpipe(); break; case 'GC': _debugGC(); break; + case 'SAMPLE': + sample.unpipe(); + sample = null; + break; + case 'SAMPLEPIPE': + sample.pipe(sample2); + console.log('Sample Piped to Sample2'); + break; } }); @@ -156,6 +194,7 @@ function wss_OnUpgrade(msg, sck, head) { case '/tunnel': this.cws = sck.upgradeWebSocket(); + this.cws.on('end', function () { console.log('Client WebSocket CLOSED'); }); console.log('Accepted Client WebSocket'); break; } @@ -175,16 +214,19 @@ function req_ws_upgrade(response, s, head) global.tun = s; - _debug(); - s.pipe(sample); - _debug(); - //global.req = null; + ////_debug(); + //sample.pipe(s); + //s.pipe(sample); + ////_debug(); + ////global.req = null; } function webSocketClientTest(port) { console.log('Initiating WebSocket'); - var woptions = http.parseUri('ws://127.0.0.1:' + port + '/tunnel'); + var woptions = http.parseUri('wss://127.0.0.1:' + port + '/tunnel'); + woptions.rejectUnauthorized = 0; + var req = http.request(woptions); req.on('upgrade', req_ws_upgrade); req.once('~', req_finalized); @@ -277,6 +319,14 @@ function startClient() _debug(); }); this.on('data', function (c) { }); + + if(global.tun!=null) + { + global.ipc = this; + console.log('Piping Together Stuff'); + global.tun.pipe(this); + this.pipe(global.tun); + } }); ret.kill = function () { @@ -320,6 +370,7 @@ function server_connection (c) c.on('data', _data); c.on('close', _close); c.on('~', _f); + global.serverc = c; } function server_closed()