diff --git a/microscript/ILibDuktape_net.c b/microscript/ILibDuktape_net.c index 60f7532..4600e62 100644 --- a/microscript/ILibDuktape_net.c +++ b/microscript/ILibDuktape_net.c @@ -85,9 +85,12 @@ typedef struct ILibDuktape_net_WindowsIPC duk_context *ctx; void *mServer, *mSocket, *mChain; HANDLE mPipeHandle; + int backlog; + int endCalled; int paused; int totalRead; void *user1; + void* ipcreserved; OVERLAPPED read_overlapped; OVERLAPPED write_overlapped; @@ -116,6 +119,8 @@ typedef struct ILibDuktape_net_WindowsIPC #define ILibDuktape_net_Server_Session_buffer "\xFF_SessionFixedBuffer" #define ILibDuktape_net_socket_ptr "\xFF_SocketPtr" #define ILibDuktape_net_WindowsIPC_Buffer "\xFF_WindowsIPC" +#define ILibDuktape_net_ConcurrencyArray "\xFF_ConcurrencyArray" +#define ILibDuktape_net_ConcurrencyMaxSize "\xFF_ConcurrencyMaxSize" #define ILibDuktape_net_WindowsIPC_PendingArray "\xFF_WindowsIPC_PendingArray" #define ILibDuktape_SERVER2ContextTable "\xFF_Server2ContextTable" #define ILibDuktape_SERVER2OPTIONS "\xFF_ServerToOptions" @@ -960,7 +965,16 @@ BOOL ILibDuktape_server_ipc_ReadSink(void *chain, HANDLE h, ILibWaitHandle_Error if (winIPC->mServer != NULL) { winIPC->clientConnected = 0; } if (winIPC->reservedState != NULL) { ILibChain_WaitHandle_DestroySavedState(chain, winIPC->reservedState); winIPC->reservedState = NULL; } + + winIPC->endCalled = 0; ILibDuktape_DuplexStream_Closed(winIPC->ds); + if (winIPC->endCalled == 0) + { + duk_push_heapptr(winIPC->ctx, winIPC->ds->readableStream->object); // [obj] + duk_prepare_method_call(winIPC->ctx, -1, "end"); // [obj][end][this] + if (duk_pcall_method(winIPC->ctx, 0) != 0) { ILibDuktape_Process_UncaughtExceptionEx(winIPC->ctx, "net.ipcServer.end() Error: "); } + duk_pop_2(winIPC->ctx); // ... + } if (winIPC->mServer != NULL) { @@ -974,6 +988,36 @@ BOOL ILibDuktape_server_ipc_ReadSink(void *chain, HANDLE h, ILibWaitHandle_Error duk_pop(winIPC->ctx); // ... } + duk_context *_ctx = winIPC->ctx; + duk_push_heapptr(winIPC->ctx, winIPC->mServer); // [server] + duk_get_prop_string(winIPC->ctx, -1, ILibDuktape_net_ConcurrencyArray); // [server][array] + duk_prepare_method_call(winIPC->ctx, -1, "indexOf"); // [server][array][indexOf][this] + duk_push_heapptr(winIPC->ctx, winIPC->ipcreserved); // [server][array][indexOf][this][buffer] + if (duk_pcall_method(winIPC->ctx, 1) == 0) + { + int ix = duk_get_int(winIPC->ctx, -1); // [server][array][index] + int setup = 0; + if (ix >= 0) + { + if (duk_get_length(_ctx, -2) == Duktape_GetIntPropertyValue(_ctx, -3, ILibDuktape_net_ConcurrencyMaxSize, -1)) + { + // We are at the maximum number of concurrent sessions, so after we remove ourselves from the list + // we must prepare for another connection + setup = 1; + } + duk_array_remove(winIPC->ctx, -2, ix); + + if (setup != 0) + { + duk_prepare_method_call(_ctx, -3, "listen"); // [server][array][index][listen][this] + duk_get_prop_string(_ctx, -1, ILibDuktape_SERVER2LISTENOPTIONS);// [server][array][index][listen][this][options] + duk_pcall_method(_ctx, 1); duk_pop(_ctx); // [server][array][index] + } + } + } + duk_pop_3(_ctx); // ... + + return(FALSE); } } @@ -1093,6 +1137,7 @@ void ILibDuktape_net_server_IPC_EndSink(ILibDuktape_DuplexStream *stream, void * if (!ILibMemory_CanaryOK(user)) { return; } ILibDuktape_net_WindowsIPC *winIPC = (ILibDuktape_net_WindowsIPC*)user; if (winIPC->mServer != NULL && winIPC->mPipeHandle == NULL) { return; } // Already Closed + winIPC->endCalled = 1; if (winIPC->reservedState != NULL) { ILibChain_WaitHandle_DestroySavedState(winIPC->mChain, winIPC->reservedState); @@ -1112,10 +1157,11 @@ void ILibDuktape_net_server_IPC_EndSink(ILibDuktape_DuplexStream *stream, void * if (winIPC->read_overlapped.hEvent != NULL) { CloseHandle(winIPC->read_overlapped.hEvent); winIPC->read_overlapped.hEvent = NULL; } if (winIPC->write_overlapped.hEvent != NULL) { CloseHandle(winIPC->write_overlapped.hEvent); winIPC->write_overlapped.hEvent = NULL; } - if (winIPC->mServer != NULL) + if (winIPC->mServer != NULL && winIPC->backlog == 0) { // Server IPC, so we can create a new Instance, and listen for a connection - duk_context *ctx = winIPC->ctx; // We need to dereference this, because winIPC will go out of scope when we call listen + // We need to dereference this, because winIPC will go out of scope when we call listen + duk_context *ctx = winIPC->ctx; CloseHandle(winIPC->overlapped.hEvent); winIPC->overlapped.hEvent = NULL; duk_push_heapptr(ctx, winIPC->mServer); // [server] @@ -1174,6 +1220,27 @@ BOOL ILibDuktape_net_server_IPC_ConnectSink(void *chain, HANDLE event, ILibWaitH if (ILibMemory_CanaryOK(user) && status == ILibWaitHandle_ErrorStatus_NONE) { ILibDuktape_net_WindowsIPC *winIPC = (ILibDuktape_net_WindowsIPC*)user; + if (winIPC->mServer != NULL) + { + duk_push_heapptr(winIPC->ctx, winIPC->mServer); // [server] + duk_get_prop_string(winIPC->ctx, -1, ILibDuktape_net_ConcurrencyArray); // [server][array] + int maxCount = Duktape_GetIntPropertyValue(winIPC->ctx, -2, ILibDuktape_net_ConcurrencyMaxSize, 0); + int curCount = (int)duk_get_length(winIPC->ctx, -1); + duk_pop_2(winIPC->ctx); // ... + + if (curCount < maxCount) + { + // We are still within concurrency limits, so lets prepare for another connection + duk_push_heapptr(winIPC->ctx, winIPC->mServer); // [server] + duk_prepare_method_call(winIPC->ctx, -1, "listen"); // [server][listen][this] + duk_remove(winIPC->ctx, -3); // [listen][this] + duk_get_prop_string(winIPC->ctx, -1, ILibDuktape_SERVER2LISTENOPTIONS); // [listen][this][options] + duk_pcall_method(winIPC->ctx, 1); + duk_pop(winIPC->ctx); // ... + } + } + + winIPC->clientConnected = TRUE; ILibDuktape_EventEmitter_SetupEmit(winIPC->ctx, winIPC->mServer, "connection"); // [emit][this][connection] duk_push_object(winIPC->ctx); // [emit][this][connection][socket] @@ -1264,9 +1331,10 @@ duk_ret_t ILibDuktape_net_server_listen(duk_context *ctx) port = (unsigned short)Duktape_GetIntPropertyValue(ctx, 0, "port", 0); - backlog = Duktape_GetIntPropertyValue(ctx, 0, "backlog", 64); host = Duktape_GetStringPropertyValue(ctx, 0, "host", NULL); ipc = Duktape_GetStringPropertyValueEx(ctx, 0, "path", NULL, &ipcLen); + backlog = Duktape_GetIntPropertyValue(ctx, 0, "backlog", (ipc != NULL && port == 0) ? 0 : 64); + if (nargs > 1 && duk_is_function(ctx, 1)) { // Callback @@ -1286,6 +1354,11 @@ duk_ret_t ILibDuktape_net_server_listen(duk_context *ctx) duk_push_this(ctx); ILibDuktape_WriteID(ctx, "net.ipcServer"); duk_push_string(ctx, ipc); duk_put_prop_string(ctx, -2, ILibDuktape_net_server_IPCPath); + if (backlog >= 0 && !duk_has_prop_string(ctx, -1, ILibDuktape_net_ConcurrencyArray)) + { + duk_push_array(ctx); duk_put_prop_string(ctx, -2, ILibDuktape_net_ConcurrencyArray); + duk_push_int(ctx, backlog); duk_put_prop_string(ctx, -2, ILibDuktape_net_ConcurrencyMaxSize); + } duk_pop(ctx); #if defined(_POSIX) @@ -1306,8 +1379,13 @@ duk_ret_t ILibDuktape_net_server_listen(duk_context *ctx) SECURITY_DESCRIPTOR IPC_SD; EXPLICIT_ACCESS IPC_EA = { 0 }; - duk_push_this(ctx); + duk_push_this(ctx); // [server] + duk_get_prop_string(ctx, -1, ILibDuktape_net_ConcurrencyArray); // [server][array] ILibDuktape_net_WindowsIPC *winIPC = (ILibDuktape_net_WindowsIPC*)Duktape_PushBuffer(ctx, sizeof(ILibDuktape_net_WindowsIPC)); + duk_dup(ctx, -1); // [server][array][buffer][buffer] + duk_array_push(ctx, -3); // [server][array][buffer] + duk_remove(ctx, -2); // [server][buffer] + winIPC->ipcreserved = duk_get_heapptr(ctx, -1); duk_put_prop_string(ctx, -2, ILibDuktape_net_WindowsIPC_Buffer); winIPC->overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); winIPC->read_overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); @@ -1317,6 +1395,7 @@ duk_ret_t ILibDuktape_net_server_listen(duk_context *ctx) winIPC->mChain = duk_ctx_chain(ctx); winIPC->clientConnected = FALSE; winIPC->metadata = "net.ipcServer"; + winIPC->backlog = backlog; duk_eval_string(ctx, "require('child_process');"); duk_pop(ctx); @@ -1348,9 +1427,10 @@ duk_ret_t ILibDuktape_net_server_listen(duk_context *ctx) winIPC->mPipeHandle = CreateNamedPipeA((LPCSTR)ipc, PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_REJECT_REMOTE_CLIENTS, - 1, ILibDuktape_net_IPC_BUFFERSIZE, ILibDuktape_net_IPC_BUFFERSIZE, 0, pIPC_SA); + (DWORD)(backlog == 0 ? 1 : backlog), ILibDuktape_net_IPC_BUFFERSIZE, ILibDuktape_net_IPC_BUFFERSIZE, 0, pIPC_SA); if (winIPC->mPipeHandle == INVALID_HANDLE_VALUE) { + DWORD err = GetLastError(); CloseHandle(winIPC->overlapped.hEvent); winIPC->overlapped.hEvent = NULL; duk_del_prop_string(ctx, -1, ILibDuktape_net_WindowsIPC_Buffer); return(ILibDuktape_Error(ctx, "Error Creating Named Pipe: %s", ipc)); diff --git a/modules/_agentStatus.js b/modules/_agentStatus.js index 838b122..4b75e89 100644 --- a/modules/_agentStatus.js +++ b/modules/_agentStatus.js @@ -3,52 +3,75 @@ var promise = require('promise'); var nodeid = require('_agentNodeId')(); var ipcPath = process.platform == 'win32' ? ('\\\\.\\pipe\\' + nodeid + '-DAIPC') : (process.cwd() + '/DAIPC'); -function queryAgent(obj) +function dataHandler(chunk) +{ + var len; + if (chunk.length < 4) { this.unshift(chunk); return; } + if ((len = chunk.readUInt32LE(0)) > chunk.length) { this.unshift(chunk); return; } + + var data = chunk.slice(4, len + 4); + var payload = null; + try + { + payload = JSON.parse(data.toString()); + } + catch (e) + { + this.promise._rej('Invalid Response Received'); + return; + } + try + { + //this.promise._res(payload.result?payload.result:''); + this.promise._res(payload.result, this); + } + catch (x) + { + } + if ((len + 4) < chunk.length) { this.unshift(chunk.slice(4 + len)); } +} +function queryAgent(obj, prev) { var ret = new promise(function (res, rej) { this._res = res; this._rej = rej; }); ret._obj = { cmd: 'query', value: obj }; - ret.client = require('net').createConnection({ path: ipcPath }); - ret.client.promise = ret; - ret.client.on('connect', function () + console.log(obj, prev); + if (prev == null) { - this.on('data', function (chunk) + ret.client = require('net').createConnection({ path: ipcPath }); + ret.client.on('connect', function () { - var len; - if (chunk.length < 4) { this.unshift(chunk); return; } - if ((len = chunk.readUInt32LE(0)) > chunk.length) { this.unshift(chunk); return;} + console.log('ON CONNECT'); + this.on('data', dataHandler); + this.on('end', function () + { + this.promise._rej('closed'); + }); - var data = chunk.slice(4, len + 4); - var payload = null; - try - { - payload = JSON.parse(data.toString()); - } - catch (e) - { - this.promise._rej('Invalid Response Received'); - return; - } - try - { - //this.promise._res(payload.result?payload.result:''); - this.promise._res(payload.result); - } - catch(x) - { - } - if ((len + 4) < chunk.length) { this.unshift(chunk.slice(4 + len)); } + var j = Buffer.from(JSON.stringify(ret._obj)); + var buf = Buffer.alloc(4 + j.length); + buf.writeUInt32LE(j.length + 4, 0); + j.copy(buf, 4); + this.write(buf); }); - this.on('end', function () + } + else + { + ret.client = prev; + ret.client.removeAllListeners('data'); + ret.client.removeAllListeners('end'); + ret.client.on('data', dataHandler); + ret.client.on('end', function () { this.promise._rej('closed'); }); - var j = Buffer.from(JSON.stringify(this.promise._obj)); + var j = Buffer.from(JSON.stringify(ret._obj)); var buf = Buffer.alloc(4 + j.length); buf.writeUInt32LE(j.length + 4, 0); j.copy(buf, 4); - this.write(buf); - }); + ret.client.write(buf); + } + ret.client.promise = ret; return (ret); } @@ -61,11 +84,11 @@ function start() process._exit(); }, 3000); - queryAgent('connection').then(function (res) + queryAgent('connection').then(function (res, connection) { if (res == null) { res = '[NOT CONNECTED]'; } console.log('Mesh Agent connected to: ' + res); - return (queryAgent('descriptors')); + return (queryAgent('descriptors', connection)); }).then(console.log).then(function () { process._exit(); }).catch(function () { process._exit(); }); }