diff --git a/microscript/ILibDuktape_HttpStream.c b/microscript/ILibDuktape_HttpStream.c index 881a687..e46e613 100644 --- a/microscript/ILibDuktape_HttpStream.c +++ b/microscript/ILibDuktape_HttpStream.c @@ -619,6 +619,7 @@ duk_ret_t ILibDuktape_HttpStream_http_onUpgrade(duk_context *ctx) duk_get_prop_string(ctx, -1, ILibDuktape_WS2CR); // [HTTPStream][websocket][request] duk_prepare_method_call(ctx, -1, "removeAllListeners"); // [HTTPStream][websocket][request][removeAll][this] duk_call_method(ctx, 0); duk_pop_2(ctx); // [HTTPStream][websocket] + duk_del_prop_string(ctx, -1, ILibDuktape_WS2CR); } return(0); @@ -4559,6 +4560,10 @@ void ILibDuktape_httpStream_webSocket_EncodedEndSink(ILibDuktape_DuplexStream *s ILibDuktape_DeleteReadOnlyProperty(state->ctx, -1, "encoded"); duk_pop(state->ctx); // ... + duk_push_this(state->ctx); // [encoded] + duk_del_prop_string(state->ctx, -1, ILibDuktape_WSENC2WS); + duk_pop(state->ctx); // ... + if (!state->closed) { ILibDuktape_DuplexStream_WriteEnd(state->decodedStream); } } void ILibDuktape_httpStream_webSocket_EncodedPauseSink_Chain(void *chain, void *user) @@ -4803,6 +4808,10 @@ duk_ret_t ILibDuktape_httpStream_webSocketStream_encoded_Finalizer(duk_context * } + return(0); +} +duk_ret_t ILibDuktape_httpStream_webSocketStream_decoded_Finalizer(duk_context *ctx) +{ return(0); } #ifdef _SSL_KEYS_EXPORTABLE @@ -4830,11 +4839,13 @@ void ILibDuktape_httpStream_webSocketStream_descriptorMetadataEx(duk_context *ct if(ws->encodedStream->writableStream->pipedReadable != NULL) { + duk_idx_t top = duk_get_top(ctx); duk_push_heapptr(ctx, ws->encodedStream->writableStream->pipedReadable); // [WebSocket_Decoded][WebSocket][Readable] char * tmp = (char*)duk_push_sprintf(ctx, "%s, %s", ILibChain_Link_GetMetadata(Duktape_GetPointerProperty(ctx, -1, ILibDuktape_ChainLinkPtr)), str); char *tmp2 = ILibMemory_SmartAllocate(duk_get_length(ctx, -1) + 1); // [WebSocket_Decoded][WebSocket][Readable][str] memcpy_s(tmp2, ILibMemory_Size(tmp2), tmp, ILibMemory_Size(tmp2) - 1); ILibChain_Link_SetMetadata(Duktape_GetPointerProperty(ctx, -2, ILibDuktape_ChainLinkPtr), tmp2); + duk_set_top(ctx, top); } } @@ -5020,6 +5031,7 @@ duk_ret_t ILibDuktape_httpStream_webSocketStream_new(duk_context *ctx) ILibDuktape_WriteID(ctx, "http.WebSocketStream.decoded"); duk_dup(ctx, -2); // [WebSocket][Decoded][WebSocket] duk_put_prop_string(ctx, -2, ILibDuktape_WSDEC2WS); // [WebSocket][Decoded] + state->decodedStream = ILibDuktape_DuplexStream_InitEx(ctx, ILibDuktape_httpStream_webSocket_DecodedWriteSink, ILibDuktape_httpStream_webSocket_DecodedEndSink, ILibDuktape_httpStream_webSocket_DecodedPauseSink, ILibDuktape_httpStream_webSocket_DecodedResumeSink, ILibDuktape_httpStream_webSocket_DecodedUnshiftSink, state); ILibDuktape_EventEmitter_CreateEventEx(ILibDuktape_EventEmitter_GetEmitter(ctx, -1), "ping"); ILibDuktape_EventEmitter_CreateEventEx(ILibDuktape_EventEmitter_GetEmitter(ctx, -1), "pong"); @@ -5038,6 +5050,10 @@ duk_ret_t ILibDuktape_httpStream_webSocketStream_new(duk_context *ctx) ILibDuktape_CreateEventWithGetter(ctx, "bytesSent_ratio", ILibDuktape_WebSocket_bytesSent_ratio); ILibDuktape_CreateEventWithGetter(ctx, "bytesReceived_ratio", ILibDuktape_WebSocket_bytesReceived_ratio); + ILibDuktape_EventEmitter_AddOnceEx3(ctx, -1, "~", ILibDuktape_httpStream_webSocketStream_decoded_Finalizer); + + + ILibDuktape_CreateReadonlyProperty(ctx, "decoded"); // [WebSocket] ILibDuktape_CreateFinalizer(ctx, ILibDuktape_httpStream_webSocketStream_finalizer); return(1); diff --git a/test/leaktest.js b/test/leaktest.js index 74bbce5..3ff51fe 100644 --- a/test/leaktest.js +++ b/test/leaktest.js @@ -14,11 +14,27 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ - +var duplex = require('stream').Duplex; var promise = require('promise'); +var http = require('http'); var processes = []; + //setModulePath('../modules'); +var sample = new duplex( + { + 'write': function (chunk, flush) + { + console.log(chunk.toString()); + flush(); + return (true); + }, + 'final': function (flush) + { + } + }); + + if (process.platform == 'win32') { global.kernel32 = require('_GenericMarshal').CreateNativeProxy('kernel32.dll'); @@ -26,6 +42,10 @@ if (process.platform == 'win32') global.kernel32.CreateMethod('GetProcessHandleCount'); } +function empty_function() +{ +} + function getHandleCount() { if (process.platform != 'win32') { return (0); } @@ -55,11 +75,17 @@ console.log(' end = Close spawned process'); console.log(' exit = Exit Test'); console.log(' Current Handle Count => ' + getHandleCount()); console.log('\n'); + + + process.stdin.on('data', function (c) { if (c.toString() == null) { return; } switch(c.toString().trim().toUpperCase()) { + case 'FINAL': + console.displayFinalizerMessages = 1; + break; case 'VERBOSE': console.setInfoLevel(1); console.info1('SetInfoLevel'); @@ -76,6 +102,9 @@ process.stdin.on('data', function (c) case 'DISPATCH': startDispatch(); break; + case 'ENDDISPATCH': + stopDispatch(); + break; case 'SERVER': startServer(); break; @@ -95,9 +124,74 @@ process.stdin.on('data', function (c) console.log('PS Capable = ' + require('win-virtual-terminal').PowerShellCapable()); console.log('ConPTY = ' + require('win-virtual-terminal').supported); break; + case 'WSS': + global.wsserver = require('http').createServer(); + global.wsserver.on('upgrade', wss_OnUpgrade); + global.wsserver.listen(); + console.log('Web Socket Server on port: ' + global.wsserver.address().port); + break; + case 'WSS4433': + global.wsserver = require('http').createServer(); + global.wsserver.on('upgrade', wss_OnUpgrade); + global.wsserver.listen({ port: 4433 }); + console.log('Web Socket Server on port: ' + global.wsserver.address().port); + break; + case 'WSC': + webSocketClientTest(global.wsserver != null ? global.wsserver.address().port : 4433); + break; + case 'TUNEND': + global.tun.end(); + global.tun.unpipe(); + global.tun = null; + break; + case 'GC': + _debugGC(); + break; } }); +function wss_OnUpgrade(msg, sck, head) +{ + switch (msg.url) + { + case '/tunnel': + this.cws = sck.upgradeWebSocket(); + console.log('Accepted Client WebSocket'); + break; + } +} +function req_finalized() +{ + console.log('Client Request Finalized'); +} +function ws_finalized() +{ + console.log('Client WebSocket finalized'); +} +function req_ws_upgrade(response, s, head) +{ + console.log('Client Web Socket Connected', s._ObjectID); + s.once('~', ws_finalized); + + global.tun = s; + + _debug(); + s.pipe(sample); + _debug(); + //global.req = null; +} +function webSocketClientTest(port) +{ + console.log('Initiating WebSocket'); + + var woptions = http.parseUri('ws://127.0.0.1:' + port + '/tunnel'); + var req = http.request(woptions); + req.on('upgrade', req_ws_upgrade); + req.once('~', req_finalized); + req.end(); + +} + function regTest() { var reg = require('win-registry'); @@ -198,6 +292,41 @@ function startClient() console.log(' Current Handle Count => ' + getHandleCount()); } +function timeouthandler() +{ + console.log('Connection => ', global.connection_ref.eval()); + //_debugGC(); +} + +function _data(b) +{ + console.log(b.toString()); +} +function _close() +{ + console.log('Client Closed'); + global._t = setTimeout(timeouthandler, 2000); +} +function _f() +{ + console.log('Connection Finalized'); +} + +function server_connection (c) +{ + //this.parent._connection = c; + global.connection_ref = require('events')._refCountPointer(c); + console.log('Connection => ', global.connection_ref.eval()); + c.on('data', _data); + c.on('close', _close); + c.on('~', _f); +} + +function server_closed() +{ + +} + function startServer() { console.log(' Current Handle Count => ' + getHandleCount()); @@ -205,12 +334,8 @@ function startServer() var ipcInteger; var ret = new promise(promise.defaultInit); ret._ipc = require('net').createServer(); ret._ipc.parent = ret; - ret._ipc.on('close', function () { }); - ret._ipc.on('connection', function (c) - { - this.parent._connection = c; - c.on('data', function (b) { console.log(b.toString()); }); - }); + ret._ipc.on('close', server_closed); + ret._ipc.on('connection', server_connection); while (true) { @@ -230,6 +355,8 @@ function startServer() ret.kill = function () { this._ipc.close(); + this._ipc._connection.parent = null; + this._ipc._connection = null; }; processes.push(ret); console.log(' Current Handle Count => ' + getHandleCount()); @@ -255,34 +382,78 @@ function startProcess() processes.push(c); console.log('HandleCount => ' + getHandleCount()); } + +function startDispatch_kill() +{ + console.log('Calling kill'); + this.dispatcher.invoke('kill', []); +} + +function startDispatch_then() +{ + this.connection = c; + c.on('data', function (b) { console.log(b.toString()); }); + +} + +function dispatch_Ondata(x) +{ + process.stdout.write(x); +} +function dispatch_OnFinal() +{ + console.log('Client Connection Finalized'); +} +function dispatch_OnEnd() +{ + console.log('Connected Ended'); +} +function startDispatch_connect(c) +{ + console.log('TERMINAL CONNECTED'); + + //console.logReferenceCount(c); + this.term = c; + //c.on('data', dispatch_Ondata); + c.on('~', dispatch_OnFinal); + //c.on('end', dispatch_OnEnd); + + c.pipe(sample); +} +function startDispatch_final() +{ + console.log('Dispatcher Finalized'); +} + function startDispatch() { var p = new promise(promise.defaultInit); - p.dispatcher = require('win-dispatcher').dispatch({ modules: [{ name: 'test_stream', script: getJSModule('test_stream') }], launch: { module: 'test_stream', method: 'start', args: [] } }); - p.dispatcher.promise = p; - p.dispatcher.on('connection', function (c) { console.log('CONNECTED'); if (this.promise.completed) { c.end(); } else { c.on('end', function () { console.log('ENDED'); }); this.promise.resolve(c); } }); - p.kill = function () - { - console.log('Calling kill'); - this.dispatcher.invoke('kill', []); - }; + + //p.dispatcher = require('win-dispatcher').dispatch({ modules: [{ name: 'test_stream', script: getJSModule('test_stream') }], launch: { module: 'test_stream', method: 'start', args: [] } }); + //p.dispatcher.promise = p; + //p.dispatcher.on('connection', function (c) { console.log('CONNECTED'); if (this.promise.completed) { c.end(); } else { c.on('end', function () { console.log('ENDED'); }); this.promise.resolve(c); } }); + //p.kill = startDispatch_kill; + + //processes.push(p); + //p.then(startDispatch_then); + + p._dispatcher = require('win-dispatcher').dispatch({ modules: [{ name: 'win-virtual-terminal', script: getJSModule('win-virtual-terminal') }], launch: { module: 'win-virtual-terminal', method: 'Start', args: [80, 25] } }); + p._dispatcher.httprequest = this.httprequest; + p._dispatcher.on('connection', startDispatch_connect); + p._dispatcher.on('~', startDispatch_final); processes.push(p); - p.then(function (c) +} +function stopDispatch() +{ + var p = processes.shift(); + if(p!=null) { - this.connection = c; - c.on('data', function (b) { console.log(b.toString()); }); - }); + console.log('Ending Connection'); + p._dispatcher.term.end(); + p._dispatcher.term.unpipe(); + p._dispatcher.term = null; + p._dispatcher = null; + } - - - //p._dispatcher = require('win-dispatcher').dispatch({ modules: [{ name: 'win-virtual-terminal', script: getJSModule('win-virtual-terminal') }], launch: { module: 'win-virtual-terminal', method: 'Start', args: [80, 25] } }); - //p._dispatcher.httprequest = this.httprequest; - //p._dispatcher.on('connection', function (c) - //{ - // console.log('TERMINAL CONNECTED'); - // p.term = c; - // c.on('data', function (x) { process.stdout.write(x); }); - //}); - -} \ No newline at end of file +}