diff --git a/microscript/ILibDuktape_HttpStream.c b/microscript/ILibDuktape_HttpStream.c index e46e613..9b79e37 100644 --- a/microscript/ILibDuktape_HttpStream.c +++ b/microscript/ILibDuktape_HttpStream.c @@ -4548,7 +4548,7 @@ ILibTransport_DoneState ILibDuktape_httpStream_webSocket_EncodedWriteSink(ILibDu void ILibDuktape_httpStream_webSocket_EncodedEndSink(ILibDuktape_DuplexStream *stream, void *user) { ILibDuktape_WebSocket_State *state = (ILibDuktape_WebSocket_State*)user; - if (duk_ctx_shutting_down(state->ctx)) { return; } + if (!ILibMemory_CanaryOK(state) || duk_ctx_shutting_down(state->ctx)) { return; } duk_push_heapptr(state->ctx, state->ObjectPtr); // [websocket] duk_get_prop_string(state->ctx, -1, "decoded"); // [websocket][decoded] @@ -4653,6 +4653,15 @@ void ILibDuktape_httpStream_webSocket_DecodedEndSink(ILibDuktape_DuplexStream *s { ILibDuktape_WebSocket_State *state = (ILibDuktape_WebSocket_State*)user; ILibDuktape_httpStream_webSocket_WriteWebSocketPacket(state, WEBSOCKET_OPCODE_CLOSE, NULL, 0, ILibWebClient_WebSocket_FragmentFlag_Complete); + + // + // We need to call 'end' on the encoded stream, so that it can disconnect + // + duk_push_heapptr(state->ctx, state->ObjectPtr); // [websocket] + duk_get_prop_string(state->ctx, -1, "encoded"); // [websocket][encoded] + duk_prepare_method_call(state->ctx, -1, "end"); // [websocket][encoded][end][this] + duk_pcall_method(state->ctx, 0); duk_pop(state->ctx); // [websocket][encoded] + duk_pop_2(state->ctx); // ... } void ILibDuktape_httpStream_webSocket_DecodedPauseSink_Chain(void *chain, void *user) { @@ -4678,7 +4687,7 @@ void ILibDuktape_httpStream_webSocket_DecodedPauseSink_Chain(void *chain, void * void ILibDuktape_httpStream_webSocket_DecodedPauseSink(ILibDuktape_DuplexStream *sender, void *user) { ILibDuktape_WebSocket_State *state = (ILibDuktape_WebSocket_State*)user; - if (state == NULL || state->encodedStream == NULL || state->encodedStream->writableStream == NULL) { return; } // ERROR + if (state == NULL || !ILibMemory_CanaryOK(state) || state->encodedStream == NULL || state->encodedStream->writableStream == NULL) { return; } // ERROR if (state->encodedStream->writableStream->pipedReadable_native != NULL && state->encodedStream->writableStream->pipedReadable_native->PauseHandler != NULL) { diff --git a/test/leaktest.js b/test/leaktest.js index 3ff51fe..9e2a044 100644 --- a/test/leaktest.js +++ b/test/leaktest.js @@ -16,7 +16,7 @@ limitations under the License. */ var duplex = require('stream').Duplex; var promise = require('promise'); -var http = require('http'); +var http = require('https'); var processes = []; //setModulePath('../modules'); @@ -33,7 +33,28 @@ var sample = new duplex( { } }); - +var sample2 = new duplex( + { + 'write': function (chunk, flush) + { + console.log(chunk.toString()); + flush(); + return (true); + }, + 'final': function (flush) + { + } + }); +function sample_final() +{ + console.log('Sample was finalized'); +} +function sample2_final() +{ + console.log('Sample2 was finalized'); +} +sample.once('~', sample_final); +sample2.once('~', sample2_final); if (process.platform == 'win32') { @@ -83,6 +104,9 @@ process.stdin.on('data', function (c) if (c.toString() == null) { return; } switch(c.toString().trim().toUpperCase()) { + case 'PIPE': + console.displayStreamPipeMessages = 1; + break; case 'FINAL': console.displayFinalizerMessages = 1; break; @@ -131,7 +155,9 @@ process.stdin.on('data', function (c) console.log('Web Socket Server on port: ' + global.wsserver.address().port); break; case 'WSS4433': - global.wsserver = require('http').createServer(); + console.log('Generating Cert...'); + var cert = require('tls').generateCertificate('test', { certType: 2, noUsages: 1 }); + global.wsserver = require('https').createServer({ pfx: cert, passphrase: 'test' }); global.wsserver.on('upgrade', wss_OnUpgrade); global.wsserver.listen({ port: 4433 }); console.log('Web Socket Server on port: ' + global.wsserver.address().port); @@ -141,12 +167,24 @@ process.stdin.on('data', function (c) break; case 'TUNEND': global.tun.end(); + //_debug(); global.tun.unpipe(); + if (global.ipc) { global.ipc.unpipe(); } global.tun = null; + global.ipc = null; + //sample.unpipe(); break; case 'GC': _debugGC(); break; + case 'SAMPLE': + sample.unpipe(); + sample = null; + break; + case 'SAMPLEPIPE': + sample.pipe(sample2); + console.log('Sample Piped to Sample2'); + break; } }); @@ -156,6 +194,7 @@ function wss_OnUpgrade(msg, sck, head) { case '/tunnel': this.cws = sck.upgradeWebSocket(); + this.cws.on('end', function () { console.log('Client WebSocket CLOSED'); }); console.log('Accepted Client WebSocket'); break; } @@ -175,16 +214,19 @@ function req_ws_upgrade(response, s, head) global.tun = s; - _debug(); - s.pipe(sample); - _debug(); - //global.req = null; + ////_debug(); + //sample.pipe(s); + //s.pipe(sample); + ////_debug(); + ////global.req = null; } function webSocketClientTest(port) { console.log('Initiating WebSocket'); - var woptions = http.parseUri('ws://127.0.0.1:' + port + '/tunnel'); + var woptions = http.parseUri('wss://127.0.0.1:' + port + '/tunnel'); + woptions.rejectUnauthorized = 0; + var req = http.request(woptions); req.on('upgrade', req_ws_upgrade); req.once('~', req_finalized); @@ -277,6 +319,14 @@ function startClient() _debug(); }); this.on('data', function (c) { }); + + if(global.tun!=null) + { + global.ipc = this; + console.log('Piping Together Stuff'); + global.tun.pipe(this); + this.pipe(global.tun); + } }); ret.kill = function () { @@ -320,6 +370,7 @@ function server_connection (c) c.on('data', _data); c.on('close', _close); c.on('~', _f); + global.serverc = c; } function server_closed()