diff --git a/microscript/ILibDuktape_ChildProcess.c b/microscript/ILibDuktape_ChildProcess.c index a118aea..4cb62c5 100644 --- a/microscript/ILibDuktape_ChildProcess.c +++ b/microscript/ILibDuktape_ChildProcess.c @@ -170,6 +170,7 @@ duk_ret_t ILibDuktape_ChildProcess_waitExit(duk_context *ctx) } duk_push_this(ctx); // [spawnedProcess] + char *_target = Duktape_GetStringPropertyValue(ctx, -1, "_target", NULL); if (!ILibChain_IsLinkAlive(Duktape_GetPointerProperty(ctx, -1, ILibDuktape_ChildProcess_Manager))) { return(ILibDuktape_Error(ctx, "Cannot waitExit() because JS Engine is exiting")); @@ -179,7 +180,14 @@ duk_ret_t ILibDuktape_ChildProcess_waitExit(duk_context *ctx) duk_put_prop_string(ctx, -2, "\xFF_WaitExit"); // [spawnedProcess] void *mods[] = { ILibGetBaseTimer(Duktape_GetChain(ctx)), Duktape_GetPointerProperty(ctx, -1, ILibDuktape_ChildProcess_Manager) }; +#ifdef WIN32 + HANDLE handles[] = { NULL, NULL, NULL, NULL }; + ILibProcessPipe_Process p = Duktape_GetPointerProperty(ctx, -1, ILibDuktape_ChildProcess_Process); + ILibProcessPipe_Process_GetWaitHandles(p, &(handles[0]), &(handles[1]), &(handles[2]), &(handles[3])); + ILibChain_Continue(chain, (ILibChain_Link**)mods, 2, timeout, (HANDLE**)handles); +#else ILibChain_Continue(chain, (ILibChain_Link**)mods, 2, timeout); +#endif return(0); } @@ -396,6 +404,7 @@ duk_ret_t ILibDuktape_ChildProcess_execFile(duk_context *ctx) return(ILibDuktape_Error(ctx, "child_process.execFile(): Could not exec [%s]", target)); } ILibDuktape_ChildProcess_SpawnedProcess_PUSH(ctx, p, callback); + duk_push_string(ctx, target); duk_put_prop_string(ctx, -2, "_target"); duk_push_pointer(ctx, manager); duk_put_prop_string(ctx, -2, ILibDuktape_ChildProcess_Manager); return(1); } diff --git a/microscript/ILibDuktape_Helpers.h b/microscript/ILibDuktape_Helpers.h index 89cd3d2..c16551c 100644 --- a/microscript/ILibDuktape_Helpers.h +++ b/microscript/ILibDuktape_Helpers.h @@ -93,6 +93,9 @@ void *Duktape_Duplicate_GetBufferPropertyEx(duk_context *ctx, duk_idx_t i, char* char *Duktape_Duplicate_GetStringEx(duk_context *ctx, duk_idx_t i, duk_size_t *len); #define Duktape_Duplicate_GetString(ctx, i) Duktape_Duplicate_GetStringEx(ctx, i, NULL) +#define duk_array_shift(ctx, i) duk_dup(ctx, i);duk_get_prop_string(ctx, -1, "shift");duk_swap_top(ctx, -2);duk_call_method(ctx, 0); +#define duk_array_pop(ctx, i) duk_dup(ctx, i);duk_get_prop_string(ctx, -1, "pop");duk_swap_top(ctx, -2);duk_call_method(ctx, 0); + int Duktape_GetBooleanProperty(duk_context *ctx, duk_idx_t i, char *propertyName, int defaultValue); struct sockaddr_in6* Duktape_IPAddress4_FromString(char* address, unsigned short port); struct sockaddr_in6* Duktape_IPAddress6_FromString(char* address, unsigned short port); diff --git a/microscript/ILibDuktape_net.c b/microscript/ILibDuktape_net.c index c20b75d..05e3479 100644 --- a/microscript/ILibDuktape_net.c +++ b/microscript/ILibDuktape_net.c @@ -82,14 +82,18 @@ int ILibDuktape_TLS_ctx2server = -1; #define ILibDuktape_net_IPC_BUFFERSIZE 4096 typedef struct ILibDuktape_net_WindowsIPC { - ILibProcessPipe_Manager manager; duk_context *ctx; void *mServer, *mSocket, *mChain; HANDLE mPipeHandle; - ILibProcessPipe_Pipe mPipe; + int paused; + int totalRead; + void *user1; + OVERLAPPED read_overlapped; + OVERLAPPED write_overlapped; OVERLAPPED overlapped; ILibDuktape_DuplexStream *ds; + BOOL clientConnected; ULONG_PTR _reserved[5]; @@ -377,13 +381,12 @@ duk_ret_t ILibDuktape_net_socket_connect(duk_context *ctx) ILibDuktape_net_WindowsIPC *winIPC = (ILibDuktape_net_WindowsIPC*)Duktape_PushBuffer(ctx, sizeof(ILibDuktape_net_WindowsIPC)); duk_put_prop_string(ctx, -2, ILibDuktape_net_WindowsIPC_Buffer); winIPC->overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); + winIPC->read_overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); + winIPC->write_overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); winIPC->ctx = ctx; winIPC->mSocket = duk_get_heapptr(ctx, -1); - winIPC->mChain = Duktape_GetChain(ctx); - - duk_eval_string(ctx, "require('child_process');"); - winIPC->manager = (ILibProcessPipe_Manager)Duktape_GetPointerProperty(ctx, -1, ILibDuktape_ChildProcess_Manager); - duk_pop(ctx); + winIPC->mChain = duk_ctx_chain(ctx); + winIPC->paused = 1; if ((winIPC->mPipeHandle = CreateFileA(path, GENERIC_READ | FILE_WRITE_DATA, 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, 0)) == INVALID_HANDLE_VALUE) { @@ -394,7 +397,6 @@ duk_ret_t ILibDuktape_net_socket_connect(duk_context *ctx) { // SUCCESS winIPC->ds = ILibDuktape_DuplexStream_InitEx(winIPC->ctx, ILibDuktape_net_server_IPC_WriteSink, ILibDuktape_net_server_IPC_EndSink, ILibDuktape_net_server_IPC_PauseSink, ILibDuktape_net_server_IPC_ResumeSink, ILibDuktape_net_server_IPC_unshiftSink, winIPC); - winIPC->mPipe = ILibProcessPipe_Pipe_CreateFromExisting(winIPC->manager, winIPC->mPipeHandle, ILibProcessPipe_Pipe_ReaderHandleType_Overlapped); winIPC->ds->readableStream->paused = 1; ILibDuktape_EventEmitter_AddHook(ILibDuktape_EventEmitter_GetEmitter(winIPC->ctx, -1), "data", ILibDuktape_net_socket_ipc_dataHookCallback); ILibDuktape_EventEmitter_AddHook(ILibDuktape_EventEmitter_GetEmitter(winIPC->ctx, -1), "end", ILibDuktape_net_socket_ipc_dataHookCallback); @@ -837,7 +839,99 @@ void ILibDuktape_net_server_OnSendOK(ILibAsyncServerSocket_ServerModule AsyncSer } #ifdef WIN32 -extern void ILibProcessPipe_FreePipe(ILibProcessPipe_Pipe pipeObject); +BOOL ILibDuktape_server_ipc_ReadSink(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus status, char *buffer, int bytesRead, void* user) +{ + ILibDuktape_net_WindowsIPC *winIPC = (ILibDuktape_net_WindowsIPC*)user; + int consumed = 0; + + if (status == ILibWaitHandle_ErrorStatus_NONE) + { + winIPC->totalRead += bytesRead; + do + { + winIPC->unshiftedBytes = 0; + if (winIPC->totalRead > 0) + { + ILibDuktape_DuplexStream_WriteData(winIPC->ds, winIPC->buffer + winIPC->bufferOffset, winIPC->totalRead); + } + if (winIPC->unshiftedBytes > winIPC->totalRead) { winIPC->unshiftedBytes = winIPC->totalRead; } + winIPC->bufferOffset += (winIPC->totalRead - winIPC->unshiftedBytes); + winIPC->totalRead -= (winIPC->totalRead - winIPC->unshiftedBytes); + } while (winIPC->paused == 0 && consumed != 0 && winIPC->totalRead > 0); + if (winIPC->totalRead == 0) { winIPC->bufferOffset = 0; } + if (winIPC->paused == 0) + { + if (winIPC->bufferOffset > 0) + { + memmove_s(winIPC->buffer, winIPC->bufferLength, winIPC->buffer + winIPC->bufferOffset, winIPC->totalRead); + winIPC->bufferOffset = 0; + } + else if (winIPC->totalRead == winIPC->bufferLength) + { + ILibMemory_ReallocateRaw(&(winIPC->buffer), winIPC->bufferLength == 0 ? ILibDuktape_net_IPC_BUFFERSIZE : winIPC->bufferLength * 2); + winIPC->bufferLength = winIPC->bufferLength == 0 ? ILibDuktape_net_IPC_BUFFERSIZE : winIPC->bufferLength * 2; + } + ILibChain_ReadEx(chain, h, &(winIPC->read_overlapped), winIPC->buffer + winIPC->bufferOffset + winIPC->totalRead, winIPC->bufferLength - winIPC->totalRead, ILibDuktape_server_ipc_ReadSink, winIPC); + return(TRUE); + } + else + { + return(FALSE); + } + } + else + { + // I/O Errors + ILibDuktape_DuplexStream_Closed(winIPC->ds); + return(FALSE); + } +} +BOOL ILibDuktape_server_ipc_WriteSink(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus status, int bytesWritten, void* user) +{ + if (!ILibMemory_CanaryOK(user)) { return(FALSE); } + ILibDuktape_net_WindowsIPC *winIPC = (ILibDuktape_net_WindowsIPC*)user; + duk_idx_t top = duk_get_top(winIPC->ctx); + duk_size_t bufLen; + char *buf; + ILibTransport_DoneState d = ILibTransport_DoneState_COMPLETE; + BOOL ret; + + duk_push_heapptr(winIPC->ctx, winIPC->mSocket); // [obj] + duk_get_prop_string(winIPC->ctx, -1, ILibDuktape_net_WindowsIPC_PendingArray); // [obj][array] + + while (d == ILibTransport_DoneState_COMPLETE) + { + duk_dup(winIPC->ctx, -1); // [obj][array][array] + duk_get_prop_string(winIPC->ctx, -1, "shift"); // [obj][array][array][shift] + duk_swap_top(winIPC->ctx, -2); // [obj][array][shift][this] + if (duk_pcall_method(winIPC->ctx, 0) != 0) { duk_set_top(winIPC->ctx, top); return(FALSE); } // [obj][array][buffer] + duk_pop(winIPC->ctx); // [obj][array] + if (duk_get_length(winIPC->ctx, -1) == 0) { break; } + duk_get_prop_index(winIPC->ctx, -1, 0); // [obj][array][buffer] + buf = Duktape_GetBuffer(winIPC->ctx, -1, &bufLen); + d = ILibChain_WriteEx(chain, h, &(winIPC->write_overlapped), buf, (int)bufLen, ILibDuktape_server_ipc_WriteSink, winIPC); + duk_pop(winIPC->ctx); // [obj][array] + } + + switch (d) + { + case ILibTransport_DoneState_COMPLETE: + // No more pending writes, so we can emit drain + ILibDuktape_DuplexStream_Ready(winIPC->ds); + ret = FALSE; + break; + case ILibTransport_DoneState_INCOMPLETE: + // Still pending writes, so return TRUE, so we can get evented later + ret = TRUE; + break; + case ILibTransport_DoneState_ERROR: + ret = FALSE; + break; + } + + duk_set_top(winIPC->ctx, top); // ... + return(ret); +} int ILibDuktape_net_server_IPC_unshiftSink(ILibDuktape_DuplexStream *sender, int unshiftBytes, void *user) { @@ -846,166 +940,29 @@ int ILibDuktape_net_server_IPC_unshiftSink(ILibDuktape_DuplexStream *sender, int winIPC->unshiftedBytes = unshiftBytes; return(unshiftBytes); } -void ILibDuktape_net_server_IPC_readsink(ILibProcessPipe_Pipe sender, void *user, DWORD dwErrorCode, char *buffer, int bufferLen) -{ - if (!ILibMemory_CanaryOK(user)) { return; } - ILibDuktape_net_WindowsIPC *winIPC = (ILibDuktape_net_WindowsIPC*)user; - if (dwErrorCode == 0) - { - winIPC->bytesLeft += bufferLen; - ILibDuktape_net_server_IPC_ResumeSink(winIPC->ds, winIPC); - } - else - { - ILibDuktape_DuplexStream_Closed(winIPC->ds); - ILibProcessPipe_FreePipe(winIPC->mPipe); - winIPC->mPipe = NULL; winIPC->mPipeHandle = NULL; - if (winIPC->buffer != NULL) { free(winIPC->buffer); winIPC->buffer = NULL; } - if (winIPC->mServer != NULL) - { - // Server IPC, so we can create a new Instance, and listen for a connection - duk_context *ctx = winIPC->ctx; // We need to dereference this, because winIPC will go out of scope when we call listen - CloseHandle(winIPC->overlapped.hEvent); winIPC->overlapped.hEvent = NULL; - - duk_push_heapptr(winIPC->ctx, winIPC->mSocket); // [connection] - duk_del_prop_string(ctx, -1, ILibDuktape_net_WindowsIPC_Buffer); duk_pop(winIPC->ctx); // ... - - duk_push_heapptr(ctx, winIPC->mServer); // [server] - if (Duktape_GetBooleanProperty(ctx, -1, ILibDuktape_net_server_closed, 0) == 0) - { - duk_get_prop_string(ctx, -1, "listen"); // [server][listen] - duk_swap_top(ctx, -2); // [listen][this] - duk_get_prop_string(ctx, -1, ILibDuktape_SERVER2LISTENOPTIONS); // [listen][this][options] - duk_pcall_method(ctx, 1); // [ret] - } - else if (Duktape_GetBooleanProperty(ctx, -1, ILibDuktape_net_server_closed_needEmit, 0) != 0) - { - duk_push_false(ctx); duk_put_prop_string(ctx, -2, ILibDuktape_net_server_closed_needEmit); - ILibDuktape_EventEmitter_SetupEmit(ctx, winIPC->mServer, "close"); // [emit][this][closed] - duk_pcall_method(ctx, 1); // [ret] - } - duk_pop(ctx); // ... - } - } -} void ILibDuktape_net_server_IPC_PauseSink(ILibDuktape_DuplexStream *sender, void *user) { // No-OP, becuase all we need to so is set Paused flag, which is already the case when we get here + ILibDuktape_net_WindowsIPC *winIPC = (ILibDuktape_net_WindowsIPC*)user; + winIPC->paused = 1; } void ILibDuktape_net_server_IPC_ResumeSink(ILibDuktape_DuplexStream *sender, void *user) { ILibDuktape_net_WindowsIPC *winIPC = (ILibDuktape_net_WindowsIPC*)user; - if (winIPC->mPipeHandle == NULL) { return; } - - if (winIPC->buffer == NULL) - { - winIPC->buffer = ILibMemory_Allocate(ILibDuktape_net_IPC_BUFFERSIZE, 0, NULL, NULL); - winIPC->bufferLength = ILibDuktape_net_IPC_BUFFERSIZE; - winIPC->bufferOffset = 0; - winIPC->bytesLeft = 0; - } - - if (winIPC->bytesLeft <= 0) - { - winIPC->bytesLeft = 0; - winIPC->bufferOffset = 0; - if (ILibProcessPipe_Pipe_ReadEx(winIPC->mPipe, winIPC->buffer, winIPC->bufferLength, winIPC, ILibDuktape_net_server_IPC_readsink) != 0) - { - ILibDuktape_net_server_IPC_readsink(winIPC->mPipe, winIPC, 1, NULL, 0); - } - } - else - { - // Check to see if we can drain any of the buffer first - while (winIPC->ds->readableStream->paused == 0) - { - winIPC->unshiftedBytes = 0; - ILibDuktape_DuplexStream_WriteData(winIPC->ds, winIPC->buffer + winIPC->bufferOffset, winIPC->bytesLeft); - if (winIPC->mPipe == NULL) { return; } // We return here without resetting processingRead, because IO was canceled - if (winIPC->unshiftedBytes > 0) - { - if (winIPC->unshiftedBytes == winIPC->bytesLeft) - { - // Unshift the entire buffer - winIPC->unshiftedBytes = 0; - } - else - { - // Unshift some of the buffer - winIPC->bufferOffset += (winIPC->bytesLeft - winIPC->unshiftedBytes); - if (winIPC->bytesLeft == winIPC->unshiftedBytes) - { - winIPC->unshiftedBytes = 0; - } - winIPC->bytesLeft = winIPC->unshiftedBytes; - } - } - else - { - winIPC->bufferOffset = winIPC->bytesLeft = 0; - } - - if (winIPC->ds->readableStream->paused == 0 && (winIPC->bytesLeft == 0 || (winIPC->bytesLeft > 0 && winIPC->unshiftedBytes == 0))) - { - if (winIPC->bufferLength - winIPC->bufferOffset - winIPC->bytesLeft == 0) - { - // We need to grow the buffer - ILibMemory_ReallocateRaw(&(winIPC->buffer), winIPC->bufferLength + ILibDuktape_net_IPC_BUFFERSIZE); - winIPC->bufferLength += ILibDuktape_net_IPC_BUFFERSIZE; - } - if (ILibProcessPipe_Pipe_ReadEx(winIPC->mPipe, winIPC->buffer + winIPC->bufferOffset + winIPC->bytesLeft, winIPC->bufferLength - winIPC->bufferOffset - winIPC->bytesLeft, winIPC, ILibDuktape_net_server_IPC_readsink) != 0) - { - ILibDuktape_net_server_IPC_readsink(winIPC->mPipe, winIPC, 1, NULL, 0); - } - break; - } - } - } + winIPC->paused = 0; + ILibDuktape_server_ipc_ReadSink(winIPC->mChain, winIPC->mPipeHandle, ILibWaitHandle_ErrorStatus_NONE, NULL, 0, winIPC); } -void ILibDuktape_net_server_IPC_WriteCompletionEvent(ILibProcessPipe_Pipe sender, void *user, DWORD errorCode, int bytesWritten) -{ - if (!ILibMemory_CanaryOK(user) || errorCode != 0) { return; } - ILibDuktape_net_WindowsIPC *winIPC = (ILibDuktape_net_WindowsIPC*)user; - duk_idx_t top = duk_get_top(winIPC->ctx); - duk_push_heapptr(winIPC->ctx, winIPC->mSocket); // [obj] - duk_get_prop_string(winIPC->ctx, -1, ILibDuktape_net_WindowsIPC_PendingArray); // [obj][array] - duk_get_prop_string(winIPC->ctx, -1, "shift"); // [obj][array][shift] - duk_dup(winIPC->ctx, -2); // [obj][array][shift][this] - if (duk_pcall_method(winIPC->ctx, 0) != 0) // [obj][array][buffer] - { - ILibDuktape_Process_UncaughtExceptionEx(winIPC->ctx, "Internal Error: net.socket.ipc.writeCompletionEvent"); - duk_set_top(winIPC->ctx, top); // ... - return; - } - duk_pop(winIPC->ctx); // [obj][array] - if (duk_get_length(winIPC->ctx, -1) > 0) - { - // Still pending Writes - duk_get_prop_index(winIPC->ctx, -1, 0); // [obj][array][buffer] - duk_size_t bufLen; - char *buf = (char*)Duktape_GetBuffer(winIPC->ctx, -1, &bufLen); - duk_set_top(winIPC->ctx, top); // ... - ILibProcessPipe_Pipe_WriteEx(winIPC->mPipe, buf, (int)bufLen, winIPC, ILibDuktape_net_server_IPC_WriteCompletionEvent); - } - else - { - // No more pending writes, so we can emit drain - duk_set_top(winIPC->ctx, top); // ... - ILibDuktape_DuplexStream_Ready(winIPC->ds); - } -} ILibTransport_DoneState ILibDuktape_net_server_IPC_WriteSink(ILibDuktape_DuplexStream *stream, char *buffer, int bufferLen, void *user) { if (!ILibMemory_CanaryOK(user)) { return(ILibTransport_DoneState_ERROR); } ILibDuktape_net_WindowsIPC *winIPC = (ILibDuktape_net_WindowsIPC*)user; - if (!duk_ctx_is_alive(winIPC->ctx) || winIPC->mPipe == NULL) { return(ILibTransport_DoneState_ERROR); } + if (!duk_ctx_is_alive(winIPC->ctx) || winIPC->mPipeHandle == NULL) { return(ILibTransport_DoneState_ERROR); } duk_push_heapptr(winIPC->ctx, winIPC->mSocket); // [obj] duk_get_prop_string(winIPC->ctx, -1, ILibDuktape_net_WindowsIPC_PendingArray); // [obj][array] - char *q = duk_push_fixed_buffer(winIPC->ctx, bufferLen); // [obj][array][buffer] duk_size_t len = duk_get_length(winIPC->ctx, -2); duk_put_prop_index(winIPC->ctx, -2, (duk_uarridx_t)len); // [obj][array] @@ -1015,7 +972,15 @@ ILibTransport_DoneState ILibDuktape_net_server_IPC_WriteSink(ILibDuktape_DuplexS if (len == 0) { // No Pending Writes - return(ILibProcessPipe_Pipe_WriteEx(winIPC->mPipe, q, bufferLen, winIPC, ILibDuktape_net_server_IPC_WriteCompletionEvent)); + ILibTransport_DoneState ret = ILibChain_WriteEx(winIPC->mChain, winIPC->mPipeHandle, &(winIPC->write_overlapped), q, bufferLen, ILibDuktape_server_ipc_WriteSink, winIPC); + if (ret != ILibTransport_DoneState_INCOMPLETE) + { + duk_push_heapptr(winIPC->ctx, winIPC->mSocket); // [obj] + duk_get_prop_string(winIPC->ctx, -1, ILibDuktape_net_WindowsIPC_PendingArray); // [obj][array] + duk_array_shift(winIPC->ctx, -1); // [obj][array][val] + duk_pop_3(winIPC->ctx); // ... + } + return(ret); } return(ILibTransport_DoneState_INCOMPLETE); @@ -1025,36 +990,45 @@ void ILibDuktape_net_server_IPC_EndSink(ILibDuktape_DuplexStream *stream, void * if (!ILibMemory_CanaryOK(user)) { return; } ILibDuktape_net_WindowsIPC *winIPC = (ILibDuktape_net_WindowsIPC*)user; - //if (ILibProcessPipe_Pipe_CancelEx(winIPC->mPipe) == 0) - //{ - ILibProcessPipe_FreePipe(winIPC->mPipe); - winIPC->mPipe = NULL; winIPC->mPipeHandle = NULL; - if (winIPC->mServer != NULL) - { - // Server IPC, so we can create a new Instance, and listen for a connection - duk_context *ctx = winIPC->ctx; // We need to dereference this, because winIPC will go out of scope when we call listen - CloseHandle(winIPC->overlapped.hEvent); winIPC->overlapped.hEvent = NULL; + if (winIPC->mPipeHandle != NULL) { CloseHandle(winIPC->mPipeHandle); winIPC->mPipeHandle = NULL; } + if (winIPC->read_overlapped.hEvent != NULL) { CloseHandle(winIPC->read_overlapped.hEvent); winIPC->read_overlapped.hEvent = NULL; } + if (winIPC->write_overlapped.hEvent != NULL) { CloseHandle(winIPC->write_overlapped.hEvent); winIPC->write_overlapped.hEvent = NULL; } - duk_push_heapptr(ctx, winIPC->mServer); // [server] - duk_get_prop_string(ctx, -1, "listen"); // [server][listen] - duk_swap_top(ctx, -2); // [listen][this] - duk_get_prop_string(ctx, -1, ILibDuktape_SERVER2LISTENOPTIONS); // [listen][this][options] - duk_pcall_method(ctx, 1); duk_pop(ctx); // ... - } - //} + if (winIPC->mServer != NULL) + { + // Server IPC, so we can create a new Instance, and listen for a connection + duk_context *ctx = winIPC->ctx; // We need to dereference this, because winIPC will go out of scope when we call listen + CloseHandle(winIPC->overlapped.hEvent); winIPC->overlapped.hEvent = NULL; + + duk_push_heapptr(ctx, winIPC->mServer); // [server] + duk_get_prop_string(ctx, -1, "listen"); // [server][listen] + duk_swap_top(ctx, -2); // [listen][this] + duk_get_prop_string(ctx, -1, ILibDuktape_SERVER2LISTENOPTIONS); // [listen][this][options] + duk_pcall_method(ctx, 1); duk_pop(ctx); // ... + } } duk_ret_t ILibDuktape_net_server_IPC_ConnectSink_Finalizer(duk_context *ctx) { ILibDuktape_net_WindowsIPC *winIPC = (ILibDuktape_net_WindowsIPC*)Duktape_GetBufferProperty(ctx, 0, ILibDuktape_net_WindowsIPC_Buffer); if (winIPC != NULL) { - if (winIPC->mPipe != NULL && winIPC->mPipeHandle != NULL) + if (winIPC->mPipeHandle != NULL) { - // It's ok to do this, becuase the CancelEx happens on the same thread, and the completion routine will use an APC Queue, so the Canary will fail before it tries to deref - //ILibProcessPipe_Pipe_CancelEx(winIPC->mPipe); - ILibProcessPipe_FreePipe(winIPC->mPipe); - winIPC->mPipe = NULL; winIPC->mPipeHandle = NULL; + CloseHandle(winIPC->mPipeHandle); + winIPC->mPipeHandle = NULL; } + if (winIPC->read_overlapped.hEvent != NULL) + { + CloseHandle(winIPC->read_overlapped.hEvent); + winIPC->read_overlapped.hEvent = NULL; + } + if (winIPC->write_overlapped.hEvent != NULL) + { + CloseHandle(winIPC->write_overlapped.hEvent); + winIPC->write_overlapped.hEvent = NULL; + } + + if (winIPC->buffer != NULL) { free(winIPC->buffer); } } return(0); } @@ -1064,6 +1038,7 @@ BOOL ILibDuktape_net_server_IPC_ConnectSink(void *chain, HANDLE event, ILibWaitH if (ILibMemory_CanaryOK(user) && status == ILibWaitHandle_ErrorStatus_NONE) { ILibDuktape_net_WindowsIPC *winIPC = (ILibDuktape_net_WindowsIPC*)user; + winIPC->clientConnected = TRUE; ILibDuktape_EventEmitter_SetupEmit(winIPC->ctx, winIPC->mServer, "connection"); // [emit][this][connection] duk_push_object(winIPC->ctx); // [emit][this][connection][socket] ILibDuktape_WriteID(winIPC->ctx, "net.socket.ipc"); @@ -1076,8 +1051,9 @@ BOOL ILibDuktape_net_server_IPC_ConnectSink(void *chain, HANDLE event, ILibWaitH duk_push_array(winIPC->ctx); duk_put_prop_string(winIPC->ctx, -2, ILibDuktape_net_WindowsIPC_PendingArray); winIPC->mSocket = duk_get_heapptr(winIPC->ctx, -1); winIPC->ds = ILibDuktape_DuplexStream_InitEx(winIPC->ctx, ILibDuktape_net_server_IPC_WriteSink, ILibDuktape_net_server_IPC_EndSink, ILibDuktape_net_server_IPC_PauseSink, ILibDuktape_net_server_IPC_ResumeSink, ILibDuktape_net_server_IPC_unshiftSink, winIPC); - winIPC->mPipe = ILibProcessPipe_Pipe_CreateFromExisting(winIPC->manager, winIPC->mPipeHandle, ILibProcessPipe_Pipe_ReaderHandleType_Overlapped); winIPC->ds->readableStream->paused = 1; + winIPC->paused = 1; + ILibDuktape_EventEmitter_AddHook(ILibDuktape_EventEmitter_GetEmitter(winIPC->ctx, -1), "data", ILibDuktape_net_socket_ipc_dataHookCallback); ILibDuktape_EventEmitter_AddHook(ILibDuktape_EventEmitter_GetEmitter(winIPC->ctx, -1), "end", ILibDuktape_net_socket_ipc_dataHookCallback); @@ -1191,12 +1167,14 @@ duk_ret_t ILibDuktape_net_server_listen(duk_context *ctx) ILibDuktape_net_WindowsIPC *winIPC = (ILibDuktape_net_WindowsIPC*)Duktape_PushBuffer(ctx, sizeof(ILibDuktape_net_WindowsIPC)); duk_put_prop_string(ctx, -2, ILibDuktape_net_WindowsIPC_Buffer); winIPC->overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); + winIPC->read_overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); + winIPC->write_overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); winIPC->ctx = ctx; winIPC->mServer = duk_get_heapptr(ctx, -1); - winIPC->mChain = Duktape_GetChain(ctx); + winIPC->mChain = duk_ctx_chain(ctx); + winIPC->clientConnected = FALSE; duk_eval_string(ctx, "require('child_process');"); - winIPC->manager = (ILibProcessPipe_Manager)Duktape_GetPointerProperty(ctx, -1, ILibDuktape_ChildProcess_Manager); duk_pop(ctx); if (Duktape_GetBooleanProperty(ctx, 0, "writableAll", 0) != 0) @@ -1235,7 +1213,6 @@ duk_ret_t ILibDuktape_net_server_listen(duk_context *ctx) } //printf("ConnectNamedPipe(%s)\n", ipc); ConnectNamedPipe(winIPC->mPipeHandle, &winIPC->overlapped); - //ILibProcessPipe_WaitHandle_Add2(winIPC->manager, winIPC->overlapped.hEvent, winIPC, ILibDuktape_net_server_IPC_ConnectSink); ILibChain_AddWaitHandle(duk_ctx_chain(ctx), winIPC->overlapped.hEvent, -1, ILibDuktape_net_server_IPC_ConnectSink, winIPC); if (pIPC_SA != NULL) { LocalFree(IPC_ACL); } @@ -1381,7 +1358,7 @@ duk_ret_t ILibDuktape_net_server_close(duk_context *ctx) ILibDuktape_net_WindowsIPC *winIPC = (ILibDuktape_net_WindowsIPC*)Duktape_GetBufferProperty(ctx, -1, ILibDuktape_net_WindowsIPC_Buffer); if (winIPC != NULL && winIPC->mPipeHandle != NULL) { - if (winIPC->mPipe == NULL) + if (winIPC->clientConnected == FALSE) { // Listening DisconnectNamedPipe(winIPC->mPipeHandle); diff --git a/microstack/ILibParsers.c b/microstack/ILibParsers.c index b34efde..82438e9 100644 --- a/microstack/ILibParsers.c +++ b/microstack/ILibParsers.c @@ -2091,7 +2091,7 @@ int ILibChain_WindowsSelect(void *chain, fd_set *readset, fd_set *writeset, fd_s } return(slct); } -void ILibChain_SetupWindowsWaitObject(HANDLE* waitList, int *waitListCount, struct timeval *tv, DWORD *timeout, fd_set *readset, fd_set *writeset, fd_set *errorset, ILibLinkedList handleList) +void ILibChain_SetupWindowsWaitObject(HANDLE* waitList, int *waitListCount, struct timeval *tv, DWORD *timeout, fd_set *readset, fd_set *writeset, fd_set *errorset, ILibLinkedList handleList, HANDLE **onlyHandles) { HANDLE selectHandles[FD_SETSIZE]; memset(selectHandles, 0, sizeof(selectHandles)); @@ -2101,7 +2101,7 @@ void ILibChain_SetupWindowsWaitObject(HANDLE* waitList, int *waitListCount, stru *waitListCount = 0; return; } - + int chkIndex; void *node; struct timeval currentTime; struct timeval expirationTime; @@ -2151,6 +2151,22 @@ void ILibChain_SetupWindowsWaitObject(HANDLE* waitList, int *waitListCount, stru node = ILibLinkedList_GetNode_Head(handleList); while (node != NULL) { + if (onlyHandles != NULL) + { + for (chkIndex = 0; onlyHandles[chkIndex] != NULL; ++chkIndex) + { + if ((HANDLE)ILibLinkedList_GetDataFromNode(node) == onlyHandles[chkIndex]) + { + chkIndex = -1; + break; + } + } + if (chkIndex != -1) + { + node = ILibLinkedList_GetNextNode(node); + continue; + } + } i = x++; if (waitList[i] != NULL && waitList[ILibChain_HandleInfoIndex(i)] == NULL) { @@ -2184,7 +2200,11 @@ ILibChain_ContinuationStates ILibChain_GetContinuationState(void *chain) { return(((ILibBaseChain*)chain)->continuationState); } +#ifdef WIN32 +ILibExportMethod void ILibChain_Continue(void *Chain, ILibChain_Link **modules, int moduleCount, int maxTimeout, HANDLE **handles) +#else ILibExportMethod void ILibChain_Continue(void *Chain, ILibChain_Link **modules, int moduleCount, int maxTimeout) +#endif { ILibBaseChain *chain = (ILibBaseChain*)Chain; ILibChain_Link_Hook *nodeHook; @@ -2207,6 +2227,11 @@ ILibExportMethod void ILibChain_Continue(void *Chain, ILibChain_Link **modules, gettimeofday(&startTime, NULL); ILibRemoteLogging_printf(ILibChainGetLogger(chain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ContinueChain..."); +#ifdef WIN32 + HANDLE currentHandle = chain->currentHandle; + ILibChain_WaitHandleInfo* currentInfo = chain->currentInfo; +#endif + while (root->TerminateFlag == 0 && root->continuationState == ILibChain_ContinuationState_CONTINUE) { if (maxTimeout > 0) @@ -2285,7 +2310,7 @@ ILibExportMethod void ILibChain_Continue(void *Chain, ILibChain_Link **modules, int x = 0; DWORD waitTimeout = 0; - ILibChain_SetupWindowsWaitObject(chain->WaitHandles, &x, &tv, &waitTimeout, &readset, &writeset, &errorset, chain->auxSelectHandles); + ILibChain_SetupWindowsWaitObject(chain->WaitHandles, &x, &tv, &waitTimeout, &readset, &writeset, &errorset, chain->auxSelectHandles, handles); slct = ILibChain_WindowsSelect(chain, &readset, &writeset, &errorset, chain->WaitHandles, x, waitTimeout); #else slct = select(FD_SETSIZE, &readset, &writeset, &errorset, &tv); @@ -2351,6 +2376,10 @@ ILibExportMethod void ILibChain_Continue(void *Chain, ILibChain_Link **modules, ILibRemoteLogging_printf(ILibChainGetLogger(chain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ContinueChain...Ending..."); root->node = currentNode; +#ifdef WIN32 + root->currentHandle = currentHandle; + root->currentInfo = currentInfo; +#endif } ILibExportMethod void ILibChain_EndContinue(void *chain) @@ -2991,16 +3020,46 @@ void *ILibChain_GetObjectForDescriptor(void *chain, int fd) } #ifdef WIN32 -BOOL ILibChain_ReadEx_Sink(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus status, void *user) +BOOL ILibChain_WriteEx_Sink(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus status, void *user) { - ILibChain_ReadEx_data *data = (ILibChain_ReadEx_data*)user; - DWORD bytesRead = 0; + ILibChain_WriteEx_data *data = (ILibChain_WriteEx_data*)user; + DWORD bytesWritten = 0; - if (GetOverlappedResult(data->fileHandle, data->p, &bytesRead, FALSE) && bytesRead > 0) + if (GetOverlappedResult(data->fileHandle, data->p, &bytesWritten, FALSE) && bytesWritten > 0) { - if (data->handler != NULL) { data->handler(chain, data->fileHandle, ILibWaitHandle_ErrorStatus_NONE, data->buffer, bytesRead, data->user); } - ILibMemory_Free(data); - return(FALSE); + data->bytesLeft -= (int)bytesWritten; + data->totalWritten += (int)bytesWritten; + data->buffer = data->buffer + bytesWritten; + if (data->bytesLeft == 0) + { + // Done Writing + if (data->handler != NULL) { data->handler(chain, data->fileHandle, ILibWaitHandle_ErrorStatus_NONE, data->totalWritten, data->user); } + ILibMemory_Free(data); + return(FALSE); + } + else + { + // More Data to write + BOOL ret = FALSE; + switch (ILibChain_WriteEx(chain, h, data->p, data->buffer, data->bytesLeft, ILibChain_WriteEx_Sink, data)) + { + case ILibTransport_DoneState_COMPLETE: + data->totalWritten += data->bytesLeft; + data->bytesLeft = 0; + if (data->handler != NULL) { data->handler(chain, data->fileHandle, ILibWaitHandle_ErrorStatus_NONE, data->totalWritten, data->user); } + ILibMemory_Free(data); + ret = FALSE; + break; + case ILibTransport_DoneState_INCOMPLETE: + ret = TRUE; + case ILibTransport_DoneState_ERROR: + if (data->handler != NULL) { data->handler(chain, data->fileHandle, ILibWaitHandle_ErrorStatus_IO_ERROR, 0, data->user); } + ILibMemory_Free(data); + ret = FALSE; + break; + } + return(ret); + } } else { @@ -3010,6 +3069,35 @@ BOOL ILibChain_ReadEx_Sink(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus sta return(TRUE); } else + { + // ERROR + if (data->handler != NULL) { data->handler(chain, data->fileHandle, ILibWaitHandle_ErrorStatus_IO_ERROR, 0, data->user); } + ILibMemory_Free(data); + return(FALSE); + } + + } +} +BOOL ILibChain_ReadEx_Sink(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus status, void *user) +{ + ILibChain_ReadEx_data *data = (ILibChain_ReadEx_data*)user; + DWORD bytesRead = 0; + DWORD err; + + if (GetOverlappedResult(data->fileHandle, data->p, &bytesRead, FALSE) && bytesRead > 0) + { + if (data->handler != NULL) { data->handler(chain, data->fileHandle, ILibWaitHandle_ErrorStatus_NONE, data->buffer, bytesRead, data->user); } + ILibMemory_Free(data); + return(FALSE); + } + else + { + if ((err=GetLastError()) == ERROR_IO_PENDING) + { + // Still pending, so wait for another callback + return(TRUE); + } + else { // ERROR if (data->handler != NULL) { data->handler(chain, data->fileHandle, ILibWaitHandle_ErrorStatus_IO_ERROR, data->buffer, 0, data->user); } @@ -3018,6 +3106,37 @@ BOOL ILibChain_ReadEx_Sink(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus sta } } } +ILibTransport_DoneState ILibChain_WriteEx(void *chain, HANDLE h, OVERLAPPED *p, char *buffer, int bufferLen, ILibChain_WriteEx_Handler handler, void *user) +{ + int e = 0; + if (!WriteFile(h, buffer, (DWORD)bufferLen, NULL, p)) + { + if ((e = GetLastError()) == ERROR_IO_PENDING) + { + // Completing Asynchronously + ILibChain_WriteEx_data *state = (ILibChain_WriteEx_data*)ILibMemory_SmartAllocate(sizeof(ILibChain_WriteEx_data)); + state->buffer = buffer; + state->bytesLeft = bufferLen; + state->totalWritten = 0; + state->p = p; + state->handler = handler; + state->fileHandle = h; + state->user = user; + ILibChain_AddWaitHandle(chain, p->hEvent, -1, ILibChain_WriteEx_Sink, state); + return(ILibTransport_DoneState_INCOMPLETE); + } + else + { + // IO Error + return(ILibTransport_DoneState_ERROR); + } + } + else + { + // Write Completed + return(ILibTransport_DoneState_COMPLETE); + } +} void ILibChain_ReadEx(void *chain, HANDLE h, OVERLAPPED *p, char *buffer, int bufferLen, ILibChain_ReadEx_Handler handler, void *user) { DWORD bytesRead = 0; @@ -3080,26 +3199,30 @@ void ILibChain_AddWaitHandle(void *chain, HANDLE h, int msTIMEOUT, ILibChain_Wai return; } + ILibChain_WaitHandleInfo *info = NULL; + if (((ILibBaseChain*)chain)->currentHandle != h) { void *node = ILibLinkedList_AddTail(((ILibBaseChain*)chain)->auxSelectHandles, h); - ILibChain_WaitHandleInfo *info = (ILibChain_WaitHandleInfo*)ILibMemory_Extra(node); + info = (ILibChain_WaitHandleInfo*)ILibMemory_Extra(node); + info->node = node; + ILibForceUnBlockChain(chain); + } + else + { + ((ILibBaseChain*)chain)->currentHandle = NULL; + info = ((ILibBaseChain*)chain)->currentInfo; + } + if (info != NULL) + { info->handler = handler; info->user = user; - info->node = node; if (msTIMEOUT != INFINITE && msTIMEOUT >= 0) { ILibGetTimeOfDay(&(info->expiration)); info->expiration.tv_sec += (long)(msTIMEOUT / 1000); info->expiration.tv_usec += ((msTIMEOUT % 1000) * 1000); } - ILibForceUnBlockChain(chain); - } - else - { - // We are trying to add ourselves, so we can optimize by not deleting, intead of adding new - ((ILibBaseChain*)chain)->currentHandle = NULL; - if (((ILibBaseChain*)chain)->currentInfo->user != user) { ((ILibBaseChain*)chain)->currentInfo->user = user; } } } void __stdcall ILibChain_RemoveWaitHandle_APC(ULONG_PTR u) @@ -3114,7 +3237,10 @@ void __stdcall ILibChain_RemoveWaitHandle_APC(ULONG_PTR u) // We found the HANDLE, so if we remove the HANDLE from the list, and // set the unblock flag, we'll be good to go // - if (chain->currentHandle == h) { chain->currentHandle = NULL; chain->currentInfo = NULL; } + if (chain->currentHandle == h) + { + chain->currentHandle = NULL; chain->currentInfo = NULL; + } ILibLinkedList_Remove(node); chain->UnblockFlag = 1; } @@ -3292,7 +3418,7 @@ ILibExportMethod void ILibStartChain(void *Chain) int x = 0; DWORD waitTimeout = 0; - ILibChain_SetupWindowsWaitObject(chain->WaitHandles, &x, &tv, &waitTimeout, &readset, &writeset, &errorset, chain->auxSelectHandles); + ILibChain_SetupWindowsWaitObject(chain->WaitHandles, &x, &tv, &waitTimeout, &readset, &writeset, &errorset, chain->auxSelectHandles, NULL); slct = ILibChain_WindowsSelect(chain, &readset, &writeset, &errorset, chain->WaitHandles, x, waitTimeout); #else slct = select(FD_SETSIZE, &readset, &writeset, &errorset, &tv); diff --git a/microstack/ILibParsers.h b/microstack/ILibParsers.h index 2a34273..21047e1 100644 --- a/microstack/ILibParsers.h +++ b/microstack/ILibParsers.h @@ -974,7 +974,9 @@ int ILibIsRunningOnChainThread(void* chain); void *ILibChain_GetObjectForDescriptor(void *chain, int fd); char *ILibChain_GetMetaDataFromDescriptorSet(void *chain, fd_set *inr, fd_set *inw, fd_set *ine); #ifdef WIN32 + typedef void(*ILib_GenericReadHandler)(char *buffer, int bufferLen, int* bytesConsumed, void* user1, void *user2); typedef BOOL(*ILibChain_ReadEx_Handler)(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus status, char *buffer, int bytesRead, void* user); + typedef BOOL(*ILibChain_WriteEx_Handler)(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus status, int bytesWritten, void* user); typedef struct ILibChain_ReadEx_data { char *buffer; @@ -983,9 +985,20 @@ int ILibIsRunningOnChainThread(void* chain); OVERLAPPED *p; void *user; }ILibChain_ReadEx_data; + typedef struct ILibChain_WriteEx_data + { + ILibChain_WriteEx_Handler handler; + char *buffer; + int bytesLeft; + int totalWritten; + HANDLE fileHandle; + OVERLAPPED *p; + void *user; + }ILibChain_WriteEx_data; void ILibChain_AddWaitHandle(void *chain, HANDLE h, int msTIMEOUT, ILibChain_WaitHandleHandler handler, void *user); void ILibChain_RemoveWaitHandle(void *chain, HANDLE h); void ILibChain_ReadEx(void *chain, HANDLE h, OVERLAPPED *p, char *buffer, int bufferLen, ILibChain_ReadEx_Handler handler, void *user); + ILibTransport_DoneState ILibChain_WriteEx(void *chain, HANDLE h, OVERLAPPED *p, char *buffer, int bufferLen, ILibChain_WriteEx_Handler handler, void *user); #define tv2LTtv1(ptv1, ptv2) ((ptv2)->tv_sec < (ptv1)->tv_sec || ((ptv2)->tv_sec == (ptv1)->tv_sec && (ptv2)->tv_usec < (ptv1)->tv_usec)) #define tv2LTEtv1(ptv1, ptv2) (tv2LTtv1(ptv2,ptv1) || ((ptv2)->tv_sec == (ptv1)->tv_sec && (ptv2)->tv_usec <= (ptv1)->tv_usec)) #define tvnonzero(ptv) ((ptv)->tv_sec != 0 || (ptv)->tv_usec != 0) @@ -993,7 +1006,11 @@ int ILibIsRunningOnChainThread(void* chain); ILibExportMethod void ILibStartChain(void *chain); ILibExportMethod void ILibStopChain(void *chain); +#ifdef WIN32 + ILibExportMethod void ILibChain_Continue(void *chain, ILibChain_Link **modules, int moduleCount, int maxTimeout, HANDLE **handles); +#else ILibExportMethod void ILibChain_Continue(void *chain, ILibChain_Link **modules, int moduleCount, int maxTimeout); +#endif ILibExportMethod void ILibChain_EndContinue(void *chain); ILibChain_ContinuationStates ILibChain_GetContinuationState(void *chain); #define ILibChain_FreeLink(link) ((ILibChain_Link*)link)->RESERVED = 0xFFFFFFFF;free(link); diff --git a/microstack/ILibProcessPipe.c b/microstack/ILibProcessPipe.c index dece3fb..ef292cf 100644 --- a/microstack/ILibProcessPipe.c +++ b/microstack/ILibProcessPipe.c @@ -1318,6 +1318,16 @@ void ILibProcessPipe_Process_AddHandlers(ILibProcessPipe_Process module, int buf #endif } } +#ifdef WIN32 +void ILibProcessPipe_Process_GetWaitHandles(ILibProcessPipe_Process p, HANDLE *hProcess, HANDLE *read, HANDLE *write, HANDLE *error) +{ + ILibProcessPipe_Process_Object* j = (ILibProcessPipe_Process_Object*)p; + *hProcess = j->hProcess; + *read = j->stdOut->mOverlapped->hEvent; + *error = j->stdErr->mOverlapped->hEvent; + *write = j->stdIn->mOverlapped->hEvent; +} +#endif void ILibProcessPipe_Pipe_Close(ILibProcessPipe_Pipe po) { ILibProcessPipe_PipeObject* pipeObject = (ILibProcessPipe_PipeObject*)po; diff --git a/microstack/ILibProcessPipe.h b/microstack/ILibProcessPipe.h index ce7bccd..81aa8a3 100644 --- a/microstack/ILibProcessPipe.h +++ b/microstack/ILibProcessPipe.h @@ -86,6 +86,9 @@ void ILibProcessPipe_Process_RemoveHandlers(ILibProcessPipe_Process module); void ILibProcessPipe_Process_UpdateUserObject(ILibProcessPipe_Process module, void *userObj); ILibTransport_DoneState ILibProcessPipe_Process_WriteStdIn(ILibProcessPipe_Process p, char* buffer, int bufferLen, ILibTransport_MemoryOwnership ownership); void ILibProcessPipe_Process_CloseStdIn(ILibProcessPipe_Process p); +#ifdef WIN32 +void ILibProcessPipe_Process_GetWaitHandles(ILibProcessPipe_Process p, HANDLE *hProcess, HANDLE *read, HANDLE *write, HANDLE *error); +#endif void ILibProcessPipe_Pipe_Close(ILibProcessPipe_Pipe po); void ILibProcessPipe_Pipe_Pause(ILibProcessPipe_Pipe pipeObject);