1
0
mirror of https://github.com/Ylianst/MeshAgent synced 2025-12-06 00:13:33 +00:00

Updated concurrency support for Windows

This commit is contained in:
Bryan Roe
2020-10-08 16:31:55 -07:00
parent 3f6f00f915
commit 1ef348bbc4
3 changed files with 109 additions and 87 deletions

File diff suppressed because one or more lines are too long

View File

@@ -968,56 +968,15 @@ BOOL ILibDuktape_server_ipc_ReadSink(void *chain, HANDLE h, ILibWaitHandle_Error
winIPC->endCalled = 0;
ILibDuktape_DuplexStream_Closed(winIPC->ds);
if (winIPC->endCalled == 0)
if (ILibMemory_CanaryOK(winIPC) && winIPC->endCalled == 0)
{
duk_context *_ctx = winIPC->ctx;
duk_push_heapptr(winIPC->ctx, winIPC->ds->readableStream->object); // [obj]
duk_prepare_method_call(winIPC->ctx, -1, "end"); // [obj][end][this]
if (duk_pcall_method(winIPC->ctx, 0) != 0) { ILibDuktape_Process_UncaughtExceptionEx(winIPC->ctx, "net.ipcServer.end() Error: "); }
duk_pop_2(winIPC->ctx); // ...
if (duk_pcall_method(winIPC->ctx, 0) != 0) { ILibDuktape_Process_UncaughtExceptionEx(_ctx, "net.ipcServer.end() Error: "); }
duk_pop_2(_ctx); // ...
}
if (winIPC->mServer != NULL)
{
duk_push_heapptr(winIPC->ctx, winIPC->mServer); // [server]
if (duk_has_prop_string(winIPC->ctx, -1, ILibDuktape_net_server_closed_needEmit))
{
ILibDuktape_EventEmitter_SetupEmit(winIPC->ctx, winIPC->mServer, "close"); // [server][emit][this][close]
if (duk_pcall_method(winIPC->ctx, 1) != 0) { ILibDuktape_Process_UncaughtExceptionEx(winIPC->ctx, "net.ipcServer.onClose() Error: "); }
duk_pop(winIPC->ctx); // [server]
}
duk_pop(winIPC->ctx); // ...
}
duk_context *_ctx = winIPC->ctx;
duk_push_heapptr(winIPC->ctx, winIPC->mServer); // [server]
duk_get_prop_string(winIPC->ctx, -1, ILibDuktape_net_ConcurrencyArray); // [server][array]
duk_prepare_method_call(winIPC->ctx, -1, "indexOf"); // [server][array][indexOf][this]
duk_push_heapptr(winIPC->ctx, winIPC->ipcreserved); // [server][array][indexOf][this][buffer]
if (duk_pcall_method(winIPC->ctx, 1) == 0)
{
int ix = duk_get_int(winIPC->ctx, -1); // [server][array][index]
int setup = 0;
if (ix >= 0)
{
if (duk_get_length(_ctx, -2) == Duktape_GetIntPropertyValue(_ctx, -3, ILibDuktape_net_ConcurrencyMaxSize, -1))
{
// We are at the maximum number of concurrent sessions, so after we remove ourselves from the list
// we must prepare for another connection
setup = 1;
}
duk_array_remove(winIPC->ctx, -2, ix);
if (setup != 0)
{
duk_prepare_method_call(_ctx, -3, "listen"); // [server][array][index][listen][this]
duk_get_prop_string(_ctx, -1, ILibDuktape_SERVER2LISTENOPTIONS);// [server][array][index][listen][this][options]
duk_pcall_method(_ctx, 1); duk_pop(_ctx); // [server][array][index]
}
}
}
duk_pop_3(_ctx); // ...
return(FALSE);
}
}
@@ -1157,22 +1116,60 @@ void ILibDuktape_net_server_IPC_EndSink(ILibDuktape_DuplexStream *stream, void *
if (winIPC->read_overlapped.hEvent != NULL) { CloseHandle(winIPC->read_overlapped.hEvent); winIPC->read_overlapped.hEvent = NULL; }
if (winIPC->write_overlapped.hEvent != NULL) { CloseHandle(winIPC->write_overlapped.hEvent); winIPC->write_overlapped.hEvent = NULL; }
if (winIPC->mServer != NULL && winIPC->backlog == 0)
if (winIPC != NULL && winIPC->mServer != NULL)
{
// Server IPC, so we can create a new Instance, and listen for a connection
// We need to dereference this, because winIPC will go out of scope when we call listen
duk_context *ctx = winIPC->ctx;
CloseHandle(winIPC->overlapped.hEvent); winIPC->overlapped.hEvent = NULL;
duk_context *_ctx = winIPC->ctx;
duk_push_heapptr(winIPC->ctx, winIPC->mServer); // [server]
int needEmitClose = Duktape_GetBooleanProperty(winIPC->ctx, -1, ILibDuktape_net_server_closed_needEmit, 0);
duk_push_heapptr(ctx, winIPC->mServer); // [server]
if (Duktape_GetBooleanProperty(ctx, -1, ILibDuktape_net_server_closed_needEmit, 0) == 0)
duk_get_prop_string(winIPC->ctx, -1, ILibDuktape_net_ConcurrencyArray); // [server][array]
duk_prepare_method_call(winIPC->ctx, -1, "indexOf"); // [server][array][indexOf][this]
duk_push_heapptr(winIPC->ctx, winIPC->ipcreserved); // [server][array][indexOf][this][buffer]
if (duk_pcall_method(winIPC->ctx, 1) == 0) // [server][array][index]
{
duk_get_prop_string(ctx, -1, "listen"); // [server][listen]
duk_swap_top(ctx, -2); // [listen][this]
duk_get_prop_string(ctx, -1, ILibDuktape_SERVER2LISTENOPTIONS); // [listen][this][options]
duk_pcall_method(ctx, 1); // [ret]
int ix = duk_get_int(winIPC->ctx, -1);
if (ix >= 0)
{
duk_uarridx_t numObjects = (duk_uarridx_t)duk_get_length(_ctx, -2);
int maxLen = Duktape_GetIntPropertyValue(_ctx, -3, ILibDuktape_net_ConcurrencyMaxSize, 1);
duk_uarridx_t z;
int setup = numObjects == maxLen ? 1 : 0;
int connected = 0;
for (z = 0; z < numObjects; ++z)
{
duk_get_prop_index(winIPC->ctx, -2, z); // [server][array][index][winIPC]
if (z != ix && ((ILibDuktape_net_WindowsIPC*)Duktape_GetBuffer(winIPC->ctx, -1, NULL))->clientConnected == FALSE)
{
setup = 0;
}
if (((ILibDuktape_net_WindowsIPC*)Duktape_GetBuffer(winIPC->ctx, -1, NULL))->clientConnected == TRUE)
{
++connected;
}
duk_pop(winIPC->ctx); // [server][array][index]
}
duk_array_remove(winIPC->ctx, -2, ix);
if (setup != 0 && needEmitClose == 0)
{
duk_prepare_method_call(_ctx, -3, "listen"); // [server][array][index][listen][this]
duk_get_prop_string(_ctx, -1, ILibDuktape_SERVER2LISTENOPTIONS);// [server][array][index][listen][this][options]
duk_pcall_method(_ctx, 1); duk_pop(_ctx); // [server][array][index]
}
else
{
if (needEmitClose != 0 && connected == 0)
{
// All connections are now closed, so we can emit 'close'
ILibDuktape_EventEmitter_SetupEmitEx(_ctx, -3, "close"); // [server][array][index][emit][this][close]
duk_pcall_method(_ctx, 1); duk_pop(_ctx); // [server][array][index]
}
}
}
}
duk_pop(ctx); // ...
duk_pop_3(_ctx); // ...
}
}
duk_ret_t ILibDuktape_net_server_IPC_ConnectSink_Finalizer(duk_context *ctx)
@@ -1386,7 +1383,8 @@ duk_ret_t ILibDuktape_net_server_listen(duk_context *ctx)
duk_array_push(ctx, -3); // [server][array][buffer]
duk_remove(ctx, -2); // [server][buffer]
winIPC->ipcreserved = duk_get_heapptr(ctx, -1);
duk_put_prop_string(ctx, -2, ILibDuktape_net_WindowsIPC_Buffer);
duk_pop(ctx); // [server]
winIPC->overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
winIPC->read_overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
winIPC->write_overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
@@ -1432,7 +1430,6 @@ duk_ret_t ILibDuktape_net_server_listen(duk_context *ctx)
{
DWORD err = GetLastError();
CloseHandle(winIPC->overlapped.hEvent); winIPC->overlapped.hEvent = NULL;
duk_del_prop_string(ctx, -1, ILibDuktape_net_WindowsIPC_Buffer);
return(ILibDuktape_Error(ctx, "Error Creating Named Pipe: %s", ipc));
}
//printf("ConnectNamedPipe(%s)\n", ipc);
@@ -1538,12 +1535,19 @@ duk_ret_t ILibDuktape_net_server_Finalizer(duk_context *ctx)
}
#ifdef WIN32
ILibDuktape_net_WindowsIPC *ipc = Duktape_GetBufferProperty(ctx, 0, ILibDuktape_net_WindowsIPC_Buffer);
if (ipc != NULL && ipc->overlapped.hEvent != NULL)
ILibDuktape_net_WindowsIPC *ipc = NULL;
duk_get_prop_string(ctx, 0, ILibDuktape_net_ConcurrencyArray); // [array]
while (duk_get_length(ctx, -1) > 0)
{
ILibChain_RemoveWaitHandle(duk_ctx_chain(ctx), ipc->overlapped.hEvent);
if (ipc->mPipeHandle != NULL) { CloseHandle(ipc->mPipeHandle); ipc->mPipeHandle = NULL; }
if (ipc->overlapped.hEvent != NULL) { CloseHandle(ipc->overlapped.hEvent); ipc->overlapped.hEvent = NULL; }
duk_array_pop(ctx, -1); // [array][winipc]
ipc = (ILibDuktape_net_WindowsIPC*)Duktape_GetBuffer(ctx, -1, NULL);
if (ipc != NULL && ipc->overlapped.hEvent != NULL)
{
ILibChain_RemoveWaitHandle(duk_ctx_chain(ctx), ipc->overlapped.hEvent);
if (ipc->mPipeHandle != NULL) { CloseHandle(ipc->mPipeHandle); ipc->mPipeHandle = NULL; }
if (ipc->overlapped.hEvent != NULL) { CloseHandle(ipc->overlapped.hEvent); ipc->overlapped.hEvent = NULL; }
}
duk_pop(ctx); // [array]
}
#endif
@@ -1615,28 +1619,48 @@ duk_ret_t ILibDuktape_net_server_close(duk_context *ctx)
#ifdef WIN32
else
{
duk_push_this(ctx);
ILibDuktape_net_WindowsIPC *winIPC = (ILibDuktape_net_WindowsIPC*)Duktape_GetBufferProperty(ctx, -1, ILibDuktape_net_WindowsIPC_Buffer);
if (winIPC != NULL && winIPC->mPipeHandle != NULL)
{
if (winIPC->clientConnected == FALSE)
{
// Listening
DisconnectNamedPipe(winIPC->mPipeHandle);
CancelIoEx(winIPC->mPipeHandle, NULL);
CloseHandle(winIPC->mPipeHandle);
winIPC->mPipeHandle = NULL;
duk_push_this(ctx); // [server]
duk_get_prop_string(ctx, -1, ILibDuktape_net_ConcurrencyArray); // [server][array]
ILibDuktape_EventEmitter_SetupEmit(ctx, server->self, "close"); // [emit][this][close]
duk_call_method(ctx, 1);
}
else
int connections = 0;
ILibDuktape_net_WindowsIPC *winIPC = NULL;
duk_uarridx_t i;
duk_size_t len = duk_get_length(ctx, -1);
for (i = 0; i < len; ++i)
{
duk_get_prop_index(ctx, -1, i); // [server][array][winipc]
winIPC = (ILibDuktape_net_WindowsIPC*)Duktape_GetBuffer(ctx, -1, NULL);
if (winIPC != NULL && winIPC->mPipeHandle != NULL)
{
// Connected
duk_push_this(ctx);
duk_push_true(ctx);
duk_put_prop_string(ctx, -2, ILibDuktape_net_server_closed_needEmit);
if (winIPC->clientConnected == FALSE)
{
// This object is listening for a new connection
DisconnectNamedPipe(winIPC->mPipeHandle);
CancelIoEx(winIPC->mPipeHandle, NULL);
CloseHandle(winIPC->mPipeHandle);
winIPC->mPipeHandle = NULL;
ILibChain_RemoveWaitHandle(duk_ctx_chain(ctx), winIPC->overlapped.hEvent);
}
else
{
++connections;
}
}
duk_pop(ctx); // [server][array]
}
if (connections == 0)
{
// No active connections, so we can emit 'close' now
ILibDuktape_EventEmitter_SetupEmit(ctx, server->self, "close"); // [emit][this][close]
duk_call_method(ctx, 1);
}
else
{
// Set a flag, so we emit this when all connections are closed
duk_push_this(ctx);
duk_push_true(ctx);
duk_put_prop_string(ctx, -2, ILibDuktape_net_server_closed_needEmit);
}
}
#endif

View File

@@ -34,13 +34,11 @@ function queryAgent(obj, prev)
{
var ret = new promise(function (res, rej) { this._res = res; this._rej = rej; });
ret._obj = { cmd: 'query', value: obj };
console.log(obj, prev);
if (prev == null)
{
ret.client = require('net').createConnection({ path: ipcPath });
ret.client.on('connect', function ()
{
console.log('ON CONNECT');
this.on('data', dataHandler);
this.on('end', function ()
{
@@ -89,7 +87,7 @@ function start()
if (res == null) { res = '[NOT CONNECTED]'; }
console.log('Mesh Agent connected to: ' + res);
return (queryAgent('descriptors', connection));
}).then(console.log).then(function () { process._exit(); }).catch(function () { process._exit(); });
}).then(function (v) { console.log(v); }).then(function () { process._exit(); }).catch(function () { process._exit(); });
}
module.exports = { start: start };