1
0
mirror of https://github.com/Ylianst/MeshAgent synced 2025-12-15 07:43:50 +00:00

Optimized references in HTTP WebSockets, and added more leaktest scenarios.

This commit is contained in:
Bryan Roe
2022-08-18 01:39:59 -07:00
parent d4864597ae
commit 905aafd9b1
2 changed files with 218 additions and 31 deletions

View File

@@ -619,6 +619,7 @@ duk_ret_t ILibDuktape_HttpStream_http_onUpgrade(duk_context *ctx)
duk_get_prop_string(ctx, -1, ILibDuktape_WS2CR); // [HTTPStream][websocket][request] duk_get_prop_string(ctx, -1, ILibDuktape_WS2CR); // [HTTPStream][websocket][request]
duk_prepare_method_call(ctx, -1, "removeAllListeners"); // [HTTPStream][websocket][request][removeAll][this] duk_prepare_method_call(ctx, -1, "removeAllListeners"); // [HTTPStream][websocket][request][removeAll][this]
duk_call_method(ctx, 0); duk_pop_2(ctx); // [HTTPStream][websocket] duk_call_method(ctx, 0); duk_pop_2(ctx); // [HTTPStream][websocket]
duk_del_prop_string(ctx, -1, ILibDuktape_WS2CR);
} }
return(0); return(0);
@@ -4559,6 +4560,10 @@ void ILibDuktape_httpStream_webSocket_EncodedEndSink(ILibDuktape_DuplexStream *s
ILibDuktape_DeleteReadOnlyProperty(state->ctx, -1, "encoded"); ILibDuktape_DeleteReadOnlyProperty(state->ctx, -1, "encoded");
duk_pop(state->ctx); // ... duk_pop(state->ctx); // ...
duk_push_this(state->ctx); // [encoded]
duk_del_prop_string(state->ctx, -1, ILibDuktape_WSENC2WS);
duk_pop(state->ctx); // ...
if (!state->closed) { ILibDuktape_DuplexStream_WriteEnd(state->decodedStream); } if (!state->closed) { ILibDuktape_DuplexStream_WriteEnd(state->decodedStream); }
} }
void ILibDuktape_httpStream_webSocket_EncodedPauseSink_Chain(void *chain, void *user) void ILibDuktape_httpStream_webSocket_EncodedPauseSink_Chain(void *chain, void *user)
@@ -4803,6 +4808,10 @@ duk_ret_t ILibDuktape_httpStream_webSocketStream_encoded_Finalizer(duk_context *
} }
return(0);
}
duk_ret_t ILibDuktape_httpStream_webSocketStream_decoded_Finalizer(duk_context *ctx)
{
return(0); return(0);
} }
#ifdef _SSL_KEYS_EXPORTABLE #ifdef _SSL_KEYS_EXPORTABLE
@@ -4830,11 +4839,13 @@ void ILibDuktape_httpStream_webSocketStream_descriptorMetadataEx(duk_context *ct
if(ws->encodedStream->writableStream->pipedReadable != NULL) if(ws->encodedStream->writableStream->pipedReadable != NULL)
{ {
duk_idx_t top = duk_get_top(ctx);
duk_push_heapptr(ctx, ws->encodedStream->writableStream->pipedReadable); // [WebSocket_Decoded][WebSocket][Readable] duk_push_heapptr(ctx, ws->encodedStream->writableStream->pipedReadable); // [WebSocket_Decoded][WebSocket][Readable]
char * tmp = (char*)duk_push_sprintf(ctx, "%s, %s", ILibChain_Link_GetMetadata(Duktape_GetPointerProperty(ctx, -1, ILibDuktape_ChainLinkPtr)), str); char * tmp = (char*)duk_push_sprintf(ctx, "%s, %s", ILibChain_Link_GetMetadata(Duktape_GetPointerProperty(ctx, -1, ILibDuktape_ChainLinkPtr)), str);
char *tmp2 = ILibMemory_SmartAllocate(duk_get_length(ctx, -1) + 1); // [WebSocket_Decoded][WebSocket][Readable][str] char *tmp2 = ILibMemory_SmartAllocate(duk_get_length(ctx, -1) + 1); // [WebSocket_Decoded][WebSocket][Readable][str]
memcpy_s(tmp2, ILibMemory_Size(tmp2), tmp, ILibMemory_Size(tmp2) - 1); memcpy_s(tmp2, ILibMemory_Size(tmp2), tmp, ILibMemory_Size(tmp2) - 1);
ILibChain_Link_SetMetadata(Duktape_GetPointerProperty(ctx, -2, ILibDuktape_ChainLinkPtr), tmp2); ILibChain_Link_SetMetadata(Duktape_GetPointerProperty(ctx, -2, ILibDuktape_ChainLinkPtr), tmp2);
duk_set_top(ctx, top);
} }
} }
@@ -5020,6 +5031,7 @@ duk_ret_t ILibDuktape_httpStream_webSocketStream_new(duk_context *ctx)
ILibDuktape_WriteID(ctx, "http.WebSocketStream.decoded"); ILibDuktape_WriteID(ctx, "http.WebSocketStream.decoded");
duk_dup(ctx, -2); // [WebSocket][Decoded][WebSocket] duk_dup(ctx, -2); // [WebSocket][Decoded][WebSocket]
duk_put_prop_string(ctx, -2, ILibDuktape_WSDEC2WS); // [WebSocket][Decoded] duk_put_prop_string(ctx, -2, ILibDuktape_WSDEC2WS); // [WebSocket][Decoded]
state->decodedStream = ILibDuktape_DuplexStream_InitEx(ctx, ILibDuktape_httpStream_webSocket_DecodedWriteSink, ILibDuktape_httpStream_webSocket_DecodedEndSink, ILibDuktape_httpStream_webSocket_DecodedPauseSink, ILibDuktape_httpStream_webSocket_DecodedResumeSink, ILibDuktape_httpStream_webSocket_DecodedUnshiftSink, state); state->decodedStream = ILibDuktape_DuplexStream_InitEx(ctx, ILibDuktape_httpStream_webSocket_DecodedWriteSink, ILibDuktape_httpStream_webSocket_DecodedEndSink, ILibDuktape_httpStream_webSocket_DecodedPauseSink, ILibDuktape_httpStream_webSocket_DecodedResumeSink, ILibDuktape_httpStream_webSocket_DecodedUnshiftSink, state);
ILibDuktape_EventEmitter_CreateEventEx(ILibDuktape_EventEmitter_GetEmitter(ctx, -1), "ping"); ILibDuktape_EventEmitter_CreateEventEx(ILibDuktape_EventEmitter_GetEmitter(ctx, -1), "ping");
ILibDuktape_EventEmitter_CreateEventEx(ILibDuktape_EventEmitter_GetEmitter(ctx, -1), "pong"); ILibDuktape_EventEmitter_CreateEventEx(ILibDuktape_EventEmitter_GetEmitter(ctx, -1), "pong");
@@ -5038,6 +5050,10 @@ duk_ret_t ILibDuktape_httpStream_webSocketStream_new(duk_context *ctx)
ILibDuktape_CreateEventWithGetter(ctx, "bytesSent_ratio", ILibDuktape_WebSocket_bytesSent_ratio); ILibDuktape_CreateEventWithGetter(ctx, "bytesSent_ratio", ILibDuktape_WebSocket_bytesSent_ratio);
ILibDuktape_CreateEventWithGetter(ctx, "bytesReceived_ratio", ILibDuktape_WebSocket_bytesReceived_ratio); ILibDuktape_CreateEventWithGetter(ctx, "bytesReceived_ratio", ILibDuktape_WebSocket_bytesReceived_ratio);
ILibDuktape_EventEmitter_AddOnceEx3(ctx, -1, "~", ILibDuktape_httpStream_webSocketStream_decoded_Finalizer);
ILibDuktape_CreateReadonlyProperty(ctx, "decoded"); // [WebSocket] ILibDuktape_CreateReadonlyProperty(ctx, "decoded"); // [WebSocket]
ILibDuktape_CreateFinalizer(ctx, ILibDuktape_httpStream_webSocketStream_finalizer); ILibDuktape_CreateFinalizer(ctx, ILibDuktape_httpStream_webSocketStream_finalizer);
return(1); return(1);

View File

@@ -14,11 +14,27 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
var duplex = require('stream').Duplex;
var promise = require('promise'); var promise = require('promise');
var http = require('http');
var processes = []; var processes = [];
//setModulePath('../modules'); //setModulePath('../modules');
var sample = new duplex(
{
'write': function (chunk, flush)
{
console.log(chunk.toString());
flush();
return (true);
},
'final': function (flush)
{
}
});
if (process.platform == 'win32') if (process.platform == 'win32')
{ {
global.kernel32 = require('_GenericMarshal').CreateNativeProxy('kernel32.dll'); global.kernel32 = require('_GenericMarshal').CreateNativeProxy('kernel32.dll');
@@ -26,6 +42,10 @@ if (process.platform == 'win32')
global.kernel32.CreateMethod('GetProcessHandleCount'); global.kernel32.CreateMethod('GetProcessHandleCount');
} }
function empty_function()
{
}
function getHandleCount() function getHandleCount()
{ {
if (process.platform != 'win32') { return (0); } if (process.platform != 'win32') { return (0); }
@@ -55,11 +75,17 @@ console.log(' end = Close spawned process');
console.log(' exit = Exit Test'); console.log(' exit = Exit Test');
console.log(' Current Handle Count => ' + getHandleCount()); console.log(' Current Handle Count => ' + getHandleCount());
console.log('\n'); console.log('\n');
process.stdin.on('data', function (c) process.stdin.on('data', function (c)
{ {
if (c.toString() == null) { return; } if (c.toString() == null) { return; }
switch(c.toString().trim().toUpperCase()) switch(c.toString().trim().toUpperCase())
{ {
case 'FINAL':
console.displayFinalizerMessages = 1;
break;
case 'VERBOSE': case 'VERBOSE':
console.setInfoLevel(1); console.setInfoLevel(1);
console.info1('SetInfoLevel'); console.info1('SetInfoLevel');
@@ -76,6 +102,9 @@ process.stdin.on('data', function (c)
case 'DISPATCH': case 'DISPATCH':
startDispatch(); startDispatch();
break; break;
case 'ENDDISPATCH':
stopDispatch();
break;
case 'SERVER': case 'SERVER':
startServer(); startServer();
break; break;
@@ -95,9 +124,74 @@ process.stdin.on('data', function (c)
console.log('PS Capable = ' + require('win-virtual-terminal').PowerShellCapable()); console.log('PS Capable = ' + require('win-virtual-terminal').PowerShellCapable());
console.log('ConPTY = ' + require('win-virtual-terminal').supported); console.log('ConPTY = ' + require('win-virtual-terminal').supported);
break; break;
case 'WSS':
global.wsserver = require('http').createServer();
global.wsserver.on('upgrade', wss_OnUpgrade);
global.wsserver.listen();
console.log('Web Socket Server on port: ' + global.wsserver.address().port);
break;
case 'WSS4433':
global.wsserver = require('http').createServer();
global.wsserver.on('upgrade', wss_OnUpgrade);
global.wsserver.listen({ port: 4433 });
console.log('Web Socket Server on port: ' + global.wsserver.address().port);
break;
case 'WSC':
webSocketClientTest(global.wsserver != null ? global.wsserver.address().port : 4433);
break;
case 'TUNEND':
global.tun.end();
global.tun.unpipe();
global.tun = null;
break;
case 'GC':
_debugGC();
break;
} }
}); });
function wss_OnUpgrade(msg, sck, head)
{
switch (msg.url)
{
case '/tunnel':
this.cws = sck.upgradeWebSocket();
console.log('Accepted Client WebSocket');
break;
}
}
function req_finalized()
{
console.log('Client Request Finalized');
}
function ws_finalized()
{
console.log('Client WebSocket finalized');
}
function req_ws_upgrade(response, s, head)
{
console.log('Client Web Socket Connected', s._ObjectID);
s.once('~', ws_finalized);
global.tun = s;
_debug();
s.pipe(sample);
_debug();
//global.req = null;
}
function webSocketClientTest(port)
{
console.log('Initiating WebSocket');
var woptions = http.parseUri('ws://127.0.0.1:' + port + '/tunnel');
var req = http.request(woptions);
req.on('upgrade', req_ws_upgrade);
req.once('~', req_finalized);
req.end();
}
function regTest() function regTest()
{ {
var reg = require('win-registry'); var reg = require('win-registry');
@@ -198,6 +292,41 @@ function startClient()
console.log(' Current Handle Count => ' + getHandleCount()); console.log(' Current Handle Count => ' + getHandleCount());
} }
function timeouthandler()
{
console.log('Connection => ', global.connection_ref.eval());
//_debugGC();
}
function _data(b)
{
console.log(b.toString());
}
function _close()
{
console.log('Client Closed');
global._t = setTimeout(timeouthandler, 2000);
}
function _f()
{
console.log('Connection Finalized');
}
function server_connection (c)
{
//this.parent._connection = c;
global.connection_ref = require('events')._refCountPointer(c);
console.log('Connection => ', global.connection_ref.eval());
c.on('data', _data);
c.on('close', _close);
c.on('~', _f);
}
function server_closed()
{
}
function startServer() function startServer()
{ {
console.log(' Current Handle Count => ' + getHandleCount()); console.log(' Current Handle Count => ' + getHandleCount());
@@ -205,12 +334,8 @@ function startServer()
var ipcInteger; var ipcInteger;
var ret = new promise(promise.defaultInit); var ret = new promise(promise.defaultInit);
ret._ipc = require('net').createServer(); ret._ipc.parent = ret; ret._ipc = require('net').createServer(); ret._ipc.parent = ret;
ret._ipc.on('close', function () { }); ret._ipc.on('close', server_closed);
ret._ipc.on('connection', function (c) ret._ipc.on('connection', server_connection);
{
this.parent._connection = c;
c.on('data', function (b) { console.log(b.toString()); });
});
while (true) while (true)
{ {
@@ -230,6 +355,8 @@ function startServer()
ret.kill = function () ret.kill = function ()
{ {
this._ipc.close(); this._ipc.close();
this._ipc._connection.parent = null;
this._ipc._connection = null;
}; };
processes.push(ret); processes.push(ret);
console.log(' Current Handle Count => ' + getHandleCount()); console.log(' Current Handle Count => ' + getHandleCount());
@@ -255,34 +382,78 @@ function startProcess()
processes.push(c); processes.push(c);
console.log('HandleCount => ' + getHandleCount()); console.log('HandleCount => ' + getHandleCount());
} }
function startDispatch()
{ function startDispatch_kill()
var p = new promise(promise.defaultInit);
p.dispatcher = require('win-dispatcher').dispatch({ modules: [{ name: 'test_stream', script: getJSModule('test_stream') }], launch: { module: 'test_stream', method: 'start', args: [] } });
p.dispatcher.promise = p;
p.dispatcher.on('connection', function (c) { console.log('CONNECTED'); if (this.promise.completed) { c.end(); } else { c.on('end', function () { console.log('ENDED'); }); this.promise.resolve(c); } });
p.kill = function ()
{ {
console.log('Calling kill'); console.log('Calling kill');
this.dispatcher.invoke('kill', []); this.dispatcher.invoke('kill', []);
}; }
processes.push(p);
p.then(function (c) function startDispatch_then()
{ {
this.connection = c; this.connection = c;
c.on('data', function (b) { console.log(b.toString()); }); c.on('data', function (b) { console.log(b.toString()); });
});
}
function dispatch_Ondata(x)
{
//p._dispatcher = require('win-dispatcher').dispatch({ modules: [{ name: 'win-virtual-terminal', script: getJSModule('win-virtual-terminal') }], launch: { module: 'win-virtual-terminal', method: 'Start', args: [80, 25] } }); process.stdout.write(x);
//p._dispatcher.httprequest = this.httprequest; }
//p._dispatcher.on('connection', function (c) function dispatch_OnFinal()
//{ {
// console.log('TERMINAL CONNECTED'); console.log('Client Connection Finalized');
// p.term = c; }
// c.on('data', function (x) { process.stdout.write(x); }); function dispatch_OnEnd()
//}); {
console.log('Connected Ended');
}
function startDispatch_connect(c)
{
console.log('TERMINAL CONNECTED');
//console.logReferenceCount(c);
this.term = c;
//c.on('data', dispatch_Ondata);
c.on('~', dispatch_OnFinal);
//c.on('end', dispatch_OnEnd);
c.pipe(sample);
}
function startDispatch_final()
{
console.log('Dispatcher Finalized');
}
function startDispatch()
{
var p = new promise(promise.defaultInit);
//p.dispatcher = require('win-dispatcher').dispatch({ modules: [{ name: 'test_stream', script: getJSModule('test_stream') }], launch: { module: 'test_stream', method: 'start', args: [] } });
//p.dispatcher.promise = p;
//p.dispatcher.on('connection', function (c) { console.log('CONNECTED'); if (this.promise.completed) { c.end(); } else { c.on('end', function () { console.log('ENDED'); }); this.promise.resolve(c); } });
//p.kill = startDispatch_kill;
//processes.push(p);
//p.then(startDispatch_then);
p._dispatcher = require('win-dispatcher').dispatch({ modules: [{ name: 'win-virtual-terminal', script: getJSModule('win-virtual-terminal') }], launch: { module: 'win-virtual-terminal', method: 'Start', args: [80, 25] } });
p._dispatcher.httprequest = this.httprequest;
p._dispatcher.on('connection', startDispatch_connect);
p._dispatcher.on('~', startDispatch_final);
processes.push(p);
}
function stopDispatch()
{
var p = processes.shift();
if(p!=null)
{
console.log('Ending Connection');
p._dispatcher.term.end();
p._dispatcher.term.unpipe();
p._dispatcher.term = null;
p._dispatcher = null;
}
} }