1
0
mirror of https://github.com/Ylianst/MeshAgent synced 2025-12-22 03:03:18 +00:00

Added support for IPC concurrency on Windows

This commit is contained in:
Bryan Roe
2020-10-06 17:56:44 -07:00
parent c7b35ae9f1
commit 3f6f00f915
2 changed files with 142 additions and 39 deletions

View File

@@ -85,9 +85,12 @@ typedef struct ILibDuktape_net_WindowsIPC
duk_context *ctx;
void *mServer, *mSocket, *mChain;
HANDLE mPipeHandle;
int backlog;
int endCalled;
int paused;
int totalRead;
void *user1;
void* ipcreserved;
OVERLAPPED read_overlapped;
OVERLAPPED write_overlapped;
@@ -116,6 +119,8 @@ typedef struct ILibDuktape_net_WindowsIPC
#define ILibDuktape_net_Server_Session_buffer "\xFF_SessionFixedBuffer"
#define ILibDuktape_net_socket_ptr "\xFF_SocketPtr"
#define ILibDuktape_net_WindowsIPC_Buffer "\xFF_WindowsIPC"
#define ILibDuktape_net_ConcurrencyArray "\xFF_ConcurrencyArray"
#define ILibDuktape_net_ConcurrencyMaxSize "\xFF_ConcurrencyMaxSize"
#define ILibDuktape_net_WindowsIPC_PendingArray "\xFF_WindowsIPC_PendingArray"
#define ILibDuktape_SERVER2ContextTable "\xFF_Server2ContextTable"
#define ILibDuktape_SERVER2OPTIONS "\xFF_ServerToOptions"
@@ -960,7 +965,16 @@ BOOL ILibDuktape_server_ipc_ReadSink(void *chain, HANDLE h, ILibWaitHandle_Error
if (winIPC->mServer != NULL) { winIPC->clientConnected = 0; }
if (winIPC->reservedState != NULL) { ILibChain_WaitHandle_DestroySavedState(chain, winIPC->reservedState); winIPC->reservedState = NULL; }
winIPC->endCalled = 0;
ILibDuktape_DuplexStream_Closed(winIPC->ds);
if (winIPC->endCalled == 0)
{
duk_push_heapptr(winIPC->ctx, winIPC->ds->readableStream->object); // [obj]
duk_prepare_method_call(winIPC->ctx, -1, "end"); // [obj][end][this]
if (duk_pcall_method(winIPC->ctx, 0) != 0) { ILibDuktape_Process_UncaughtExceptionEx(winIPC->ctx, "net.ipcServer.end() Error: "); }
duk_pop_2(winIPC->ctx); // ...
}
if (winIPC->mServer != NULL)
{
@@ -974,6 +988,36 @@ BOOL ILibDuktape_server_ipc_ReadSink(void *chain, HANDLE h, ILibWaitHandle_Error
duk_pop(winIPC->ctx); // ...
}
duk_context *_ctx = winIPC->ctx;
duk_push_heapptr(winIPC->ctx, winIPC->mServer); // [server]
duk_get_prop_string(winIPC->ctx, -1, ILibDuktape_net_ConcurrencyArray); // [server][array]
duk_prepare_method_call(winIPC->ctx, -1, "indexOf"); // [server][array][indexOf][this]
duk_push_heapptr(winIPC->ctx, winIPC->ipcreserved); // [server][array][indexOf][this][buffer]
if (duk_pcall_method(winIPC->ctx, 1) == 0)
{
int ix = duk_get_int(winIPC->ctx, -1); // [server][array][index]
int setup = 0;
if (ix >= 0)
{
if (duk_get_length(_ctx, -2) == Duktape_GetIntPropertyValue(_ctx, -3, ILibDuktape_net_ConcurrencyMaxSize, -1))
{
// We are at the maximum number of concurrent sessions, so after we remove ourselves from the list
// we must prepare for another connection
setup = 1;
}
duk_array_remove(winIPC->ctx, -2, ix);
if (setup != 0)
{
duk_prepare_method_call(_ctx, -3, "listen"); // [server][array][index][listen][this]
duk_get_prop_string(_ctx, -1, ILibDuktape_SERVER2LISTENOPTIONS);// [server][array][index][listen][this][options]
duk_pcall_method(_ctx, 1); duk_pop(_ctx); // [server][array][index]
}
}
}
duk_pop_3(_ctx); // ...
return(FALSE);
}
}
@@ -1093,6 +1137,7 @@ void ILibDuktape_net_server_IPC_EndSink(ILibDuktape_DuplexStream *stream, void *
if (!ILibMemory_CanaryOK(user)) { return; }
ILibDuktape_net_WindowsIPC *winIPC = (ILibDuktape_net_WindowsIPC*)user;
if (winIPC->mServer != NULL && winIPC->mPipeHandle == NULL) { return; } // Already Closed
winIPC->endCalled = 1;
if (winIPC->reservedState != NULL)
{
ILibChain_WaitHandle_DestroySavedState(winIPC->mChain, winIPC->reservedState);
@@ -1112,10 +1157,11 @@ void ILibDuktape_net_server_IPC_EndSink(ILibDuktape_DuplexStream *stream, void *
if (winIPC->read_overlapped.hEvent != NULL) { CloseHandle(winIPC->read_overlapped.hEvent); winIPC->read_overlapped.hEvent = NULL; }
if (winIPC->write_overlapped.hEvent != NULL) { CloseHandle(winIPC->write_overlapped.hEvent); winIPC->write_overlapped.hEvent = NULL; }
if (winIPC->mServer != NULL)
if (winIPC->mServer != NULL && winIPC->backlog == 0)
{
// Server IPC, so we can create a new Instance, and listen for a connection
duk_context *ctx = winIPC->ctx; // We need to dereference this, because winIPC will go out of scope when we call listen
// We need to dereference this, because winIPC will go out of scope when we call listen
duk_context *ctx = winIPC->ctx;
CloseHandle(winIPC->overlapped.hEvent); winIPC->overlapped.hEvent = NULL;
duk_push_heapptr(ctx, winIPC->mServer); // [server]
@@ -1174,6 +1220,27 @@ BOOL ILibDuktape_net_server_IPC_ConnectSink(void *chain, HANDLE event, ILibWaitH
if (ILibMemory_CanaryOK(user) && status == ILibWaitHandle_ErrorStatus_NONE)
{
ILibDuktape_net_WindowsIPC *winIPC = (ILibDuktape_net_WindowsIPC*)user;
if (winIPC->mServer != NULL)
{
duk_push_heapptr(winIPC->ctx, winIPC->mServer); // [server]
duk_get_prop_string(winIPC->ctx, -1, ILibDuktape_net_ConcurrencyArray); // [server][array]
int maxCount = Duktape_GetIntPropertyValue(winIPC->ctx, -2, ILibDuktape_net_ConcurrencyMaxSize, 0);
int curCount = (int)duk_get_length(winIPC->ctx, -1);
duk_pop_2(winIPC->ctx); // ...
if (curCount < maxCount)
{
// We are still within concurrency limits, so lets prepare for another connection
duk_push_heapptr(winIPC->ctx, winIPC->mServer); // [server]
duk_prepare_method_call(winIPC->ctx, -1, "listen"); // [server][listen][this]
duk_remove(winIPC->ctx, -3); // [listen][this]
duk_get_prop_string(winIPC->ctx, -1, ILibDuktape_SERVER2LISTENOPTIONS); // [listen][this][options]
duk_pcall_method(winIPC->ctx, 1);
duk_pop(winIPC->ctx); // ...
}
}
winIPC->clientConnected = TRUE;
ILibDuktape_EventEmitter_SetupEmit(winIPC->ctx, winIPC->mServer, "connection"); // [emit][this][connection]
duk_push_object(winIPC->ctx); // [emit][this][connection][socket]
@@ -1264,9 +1331,10 @@ duk_ret_t ILibDuktape_net_server_listen(duk_context *ctx)
port = (unsigned short)Duktape_GetIntPropertyValue(ctx, 0, "port", 0);
backlog = Duktape_GetIntPropertyValue(ctx, 0, "backlog", 64);
host = Duktape_GetStringPropertyValue(ctx, 0, "host", NULL);
ipc = Duktape_GetStringPropertyValueEx(ctx, 0, "path", NULL, &ipcLen);
backlog = Duktape_GetIntPropertyValue(ctx, 0, "backlog", (ipc != NULL && port == 0) ? 0 : 64);
if (nargs > 1 && duk_is_function(ctx, 1))
{
// Callback
@@ -1286,6 +1354,11 @@ duk_ret_t ILibDuktape_net_server_listen(duk_context *ctx)
duk_push_this(ctx);
ILibDuktape_WriteID(ctx, "net.ipcServer");
duk_push_string(ctx, ipc); duk_put_prop_string(ctx, -2, ILibDuktape_net_server_IPCPath);
if (backlog >= 0 && !duk_has_prop_string(ctx, -1, ILibDuktape_net_ConcurrencyArray))
{
duk_push_array(ctx); duk_put_prop_string(ctx, -2, ILibDuktape_net_ConcurrencyArray);
duk_push_int(ctx, backlog); duk_put_prop_string(ctx, -2, ILibDuktape_net_ConcurrencyMaxSize);
}
duk_pop(ctx);
#if defined(_POSIX)
@@ -1306,8 +1379,13 @@ duk_ret_t ILibDuktape_net_server_listen(duk_context *ctx)
SECURITY_DESCRIPTOR IPC_SD;
EXPLICIT_ACCESS IPC_EA = { 0 };
duk_push_this(ctx);
duk_push_this(ctx); // [server]
duk_get_prop_string(ctx, -1, ILibDuktape_net_ConcurrencyArray); // [server][array]
ILibDuktape_net_WindowsIPC *winIPC = (ILibDuktape_net_WindowsIPC*)Duktape_PushBuffer(ctx, sizeof(ILibDuktape_net_WindowsIPC));
duk_dup(ctx, -1); // [server][array][buffer][buffer]
duk_array_push(ctx, -3); // [server][array][buffer]
duk_remove(ctx, -2); // [server][buffer]
winIPC->ipcreserved = duk_get_heapptr(ctx, -1);
duk_put_prop_string(ctx, -2, ILibDuktape_net_WindowsIPC_Buffer);
winIPC->overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
winIPC->read_overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
@@ -1317,6 +1395,7 @@ duk_ret_t ILibDuktape_net_server_listen(duk_context *ctx)
winIPC->mChain = duk_ctx_chain(ctx);
winIPC->clientConnected = FALSE;
winIPC->metadata = "net.ipcServer";
winIPC->backlog = backlog;
duk_eval_string(ctx, "require('child_process');");
duk_pop(ctx);
@@ -1348,9 +1427,10 @@ duk_ret_t ILibDuktape_net_server_listen(duk_context *ctx)
winIPC->mPipeHandle = CreateNamedPipeA((LPCSTR)ipc, PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_REJECT_REMOTE_CLIENTS,
1, ILibDuktape_net_IPC_BUFFERSIZE, ILibDuktape_net_IPC_BUFFERSIZE, 0, pIPC_SA);
(DWORD)(backlog == 0 ? 1 : backlog), ILibDuktape_net_IPC_BUFFERSIZE, ILibDuktape_net_IPC_BUFFERSIZE, 0, pIPC_SA);
if (winIPC->mPipeHandle == INVALID_HANDLE_VALUE)
{
DWORD err = GetLastError();
CloseHandle(winIPC->overlapped.hEvent); winIPC->overlapped.hEvent = NULL;
duk_del_prop_string(ctx, -1, ILibDuktape_net_WindowsIPC_Buffer);
return(ILibDuktape_Error(ctx, "Error Creating Named Pipe: %s", ipc));

View File

@@ -3,52 +3,75 @@ var promise = require('promise');
var nodeid = require('_agentNodeId')();
var ipcPath = process.platform == 'win32' ? ('\\\\.\\pipe\\' + nodeid + '-DAIPC') : (process.cwd() + '/DAIPC');
function queryAgent(obj)
function dataHandler(chunk)
{
var len;
if (chunk.length < 4) { this.unshift(chunk); return; }
if ((len = chunk.readUInt32LE(0)) > chunk.length) { this.unshift(chunk); return; }
var data = chunk.slice(4, len + 4);
var payload = null;
try
{
payload = JSON.parse(data.toString());
}
catch (e)
{
this.promise._rej('Invalid Response Received');
return;
}
try
{
//this.promise._res(payload.result?payload.result:'');
this.promise._res(payload.result, this);
}
catch (x)
{
}
if ((len + 4) < chunk.length) { this.unshift(chunk.slice(4 + len)); }
}
function queryAgent(obj, prev)
{
var ret = new promise(function (res, rej) { this._res = res; this._rej = rej; });
ret._obj = { cmd: 'query', value: obj };
ret.client = require('net').createConnection({ path: ipcPath });
ret.client.promise = ret;
ret.client.on('connect', function ()
console.log(obj, prev);
if (prev == null)
{
this.on('data', function (chunk)
ret.client = require('net').createConnection({ path: ipcPath });
ret.client.on('connect', function ()
{
var len;
if (chunk.length < 4) { this.unshift(chunk); return; }
if ((len = chunk.readUInt32LE(0)) > chunk.length) { this.unshift(chunk); return;}
console.log('ON CONNECT');
this.on('data', dataHandler);
this.on('end', function ()
{
this.promise._rej('closed');
});
var data = chunk.slice(4, len + 4);
var payload = null;
try
{
payload = JSON.parse(data.toString());
}
catch (e)
{
this.promise._rej('Invalid Response Received');
return;
}
try
{
//this.promise._res(payload.result?payload.result:'');
this.promise._res(payload.result);
}
catch(x)
{
}
if ((len + 4) < chunk.length) { this.unshift(chunk.slice(4 + len)); }
var j = Buffer.from(JSON.stringify(ret._obj));
var buf = Buffer.alloc(4 + j.length);
buf.writeUInt32LE(j.length + 4, 0);
j.copy(buf, 4);
this.write(buf);
});
this.on('end', function ()
}
else
{
ret.client = prev;
ret.client.removeAllListeners('data');
ret.client.removeAllListeners('end');
ret.client.on('data', dataHandler);
ret.client.on('end', function ()
{
this.promise._rej('closed');
});
var j = Buffer.from(JSON.stringify(this.promise._obj));
var j = Buffer.from(JSON.stringify(ret._obj));
var buf = Buffer.alloc(4 + j.length);
buf.writeUInt32LE(j.length + 4, 0);
j.copy(buf, 4);
this.write(buf);
});
ret.client.write(buf);
}
ret.client.promise = ret;
return (ret);
}
@@ -61,11 +84,11 @@ function start()
process._exit();
}, 3000);
queryAgent('connection').then(function (res)
queryAgent('connection').then(function (res, connection)
{
if (res == null) { res = '[NOT CONNECTED]'; }
console.log('Mesh Agent connected to: ' + res);
return (queryAgent('descriptors'));
return (queryAgent('descriptors', connection));
}).then(console.log).then(function () { process._exit(); }).catch(function () { process._exit(); });
}