From 9668b8d77d1486cd86c048fe71e6d529d929812a Mon Sep 17 00:00:00 2001 From: Bryan Roe Date: Thu, 7 May 2020 10:29:49 -0700 Subject: [PATCH] Update to use new threading model for windows --- microscript/ILibDuktape_ChildProcess.c | 40 +- microscript/ILibDuktape_net.c | 4 +- microstack/ILibParsers.c | 339 ++++++++-------- microstack/ILibProcessPipe.c | 513 ++----------------------- microstack/ILibProcessPipe.h | 10 - 5 files changed, 226 insertions(+), 680 deletions(-) diff --git a/microscript/ILibDuktape_ChildProcess.c b/microscript/ILibDuktape_ChildProcess.c index 1fe129d..167abc2 100644 --- a/microscript/ILibDuktape_ChildProcess.c +++ b/microscript/ILibDuktape_ChildProcess.c @@ -110,39 +110,15 @@ void ILibDuktape_ChildProcess_SubProcess_ExitHandler(ILibProcessPipe_Process sen ILibDuktape_ChildProcess_SubProcess *p = (ILibDuktape_ChildProcess_SubProcess*)user; if (!ILibMemory_CanaryOK(p)) { return; } -#ifdef WIN32 - if (duk_ctx_context_data(p->ctx)->apc_flags == 0 && p->dispatchFlags == 0) - { - // This method was called with an APC, but this thread was running an unknown alertable method when it was interrupted - // So we must unwind the stack, and use a non-apc method to re-dispatch to this thread, becuase we can't risk - // calling a winsock method, in case this thread was inside winsock when it was interrupted, because otherwise, it - // will corrupt memory, resulting in a possible crash. - // - // We had to do the APC first, becuase otherwise child_process.waitExit() will not work, becuase that method is blocking - // the event loop thread with an alertable wait object, so APC is the only way to propagate this event - p->exitCode = exitCode; - p->dispatchFlags = 1; - Duktape_RunOnEventLoop(p->chain, duk_ctx_nonce(p->ctx), p->ctx, ILibDuktape_ChildProcess_SubProcess_ExitHandler_sink1, NULL, p); - return; - } -#endif - p->exitCode = exitCode; p->childProcess = NULL; duk_push_heapptr(p->ctx, p->subProcess); // [childProcess] -#ifdef WIN32 - HANDLE exitptr = (HANDLE)Duktape_GetPointerProperty(p->ctx, -1, "\xFF_WaitExit"); - if (exitptr != NULL) - { - SetEvent(exitptr); - } -#else if (Duktape_GetIntPropertyValue(p->ctx, -1, "\xFF_WaitExit", 0) != 0) { ILibChain_EndContinue(Duktape_GetChain(p->ctx)); } -#endif + duk_get_prop_string(p->ctx, -1, "emit"); // [childProcess][emit] duk_swap_top(p->ctx, -2); // [emit][this] duk_push_string(p->ctx, "exit"); // [emit][this][exit] @@ -198,25 +174,11 @@ duk_ret_t ILibDuktape_ChildProcess_waitExit(duk_context *ctx) return(ILibDuktape_Error(ctx, "Cannot waitExit() because JS Engine is exiting")); } -#ifdef WIN32 - DWORD result; - HANDLE eptr = CreateEventA(NULL, TRUE, FALSE, NULL); - duk_push_pointer(ctx, (void*)eptr); -#else duk_push_int(ctx, 1); // [spawnedProcess][flag] -#endif duk_put_prop_string(ctx, -2, "\xFF_WaitExit"); // [spawnedProcess] -#ifdef WIN32 - duk_ctx_context_data(ctx)->apc_flags = 1; - while ((result=WaitForSingleObjectEx(eptr, duk_is_number(ctx, 0) ? duk_require_int(ctx, 0) : INFINITE, TRUE)) != WAIT_OBJECT_0 && result != WAIT_TIMEOUT); - duk_ctx_context_data(ctx)->apc_flags = 0; - CloseHandle(eptr); - if (result == WAIT_TIMEOUT) { return(ILibDuktape_Error(ctx, "timeout")); } -#else void *mods[] = { ILibGetBaseTimer(Duktape_GetChain(ctx)), Duktape_GetPointerProperty(ctx, -1, ILibDuktape_ChildProcess_Manager) }; ILibChain_Continue(chain, (ILibChain_Link**)mods, 2, -1); -#endif return(0); } diff --git a/microscript/ILibDuktape_net.c b/microscript/ILibDuktape_net.c index 85ab52f..7a7166b 100644 --- a/microscript/ILibDuktape_net.c +++ b/microscript/ILibDuktape_net.c @@ -867,7 +867,6 @@ void ILibDuktape_net_server_IPC_readsink(ILibProcessPipe_Pipe sender, void *user // Server IPC, so we can create a new Instance, and listen for a connection duk_context *ctx = winIPC->ctx; // We need to dereference this, because winIPC will go out of scope when we call listen CloseHandle(winIPC->overlapped.hEvent); winIPC->overlapped.hEvent = NULL; - if (winIPC->buffer != NULL) { free(winIPC->buffer); winIPC->buffer = NULL; } duk_push_heapptr(winIPC->ctx, winIPC->mSocket); // [connection] duk_del_prop_string(ctx, -1, ILibDuktape_net_WindowsIPC_Buffer); duk_pop(winIPC->ctx); // ... @@ -1055,7 +1054,6 @@ duk_ret_t ILibDuktape_net_server_IPC_ConnectSink_Finalizer(duk_context *ctx) ILibProcessPipe_FreePipe(winIPC->mPipe); winIPC->mPipe = NULL; winIPC->mPipeHandle = NULL; } - if (winIPC->buffer != NULL) { free(winIPC->buffer); winIPC->buffer = NULL; } } return(0); } @@ -1317,7 +1315,7 @@ duk_ret_t ILibDuktape_net_server_Finalizer(duk_context *ctx) ILibDuktape_net_WindowsIPC *ipc = Duktape_GetBufferProperty(ctx, 0, ILibDuktape_net_WindowsIPC_Buffer); if (ipc != NULL && ipc->overlapped.hEvent != NULL) { - ILibProcessPipe_WaitHandle_Remove(ipc->manager, ipc->overlapped.hEvent); + ILibChain_RemoveWaitHandle(duk_ctx_chain(ctx), ipc->overlapped.hEvent); if (ipc->mPipeHandle != NULL) { CloseHandle(ipc->mPipeHandle); ipc->mPipeHandle = NULL; } if (ipc->overlapped.hEvent != NULL) { CloseHandle(ipc->overlapped.hEvent); ipc->overlapped.hEvent = NULL; } } diff --git a/microstack/ILibParsers.c b/microstack/ILibParsers.c index 7886e4c..e95b0c9 100644 --- a/microstack/ILibParsers.c +++ b/microstack/ILibParsers.c @@ -1998,6 +1998,181 @@ void ILibChain_UpdateEventHook(ILibChain_EventHookToken token, int maxTimeout) memset(hook, 0, sizeof(ILibChain_Link_Hook)); } } + + +#ifdef WIN32 +int ILibChain_WindowsSelect(void *chain, fd_set *readset, fd_set *writeset, fd_set *errorset, HANDLE *waitList, int waitListCount, DWORD waitTimeout) +{ + int slct = -1; + int i; + struct timeval currentTime; + struct timeval tv; + + if (waitListCount == 0) + { + SleepEx(waitTimeout, TRUE); + slct = -1; + } + else + { + while ((slct = WaitForMultipleObjectsEx(waitListCount, waitList, FALSE, waitTimeout, TRUE)) == WAIT_IO_COMPLETION && ((ILibBaseChain*)chain)->UnblockFlag == 0) {} + ILibGetTimeOfDay(¤tTime); + if (slct != WAIT_IO_COMPLETION && (slct - (int)WAIT_OBJECT_0 >= 0) && (slct - (int)WAIT_OBJECT_0 < waitListCount)) + { + if (waitList[ILibChain_HandleInfoIndex(slct)] != NULL) + { + ILibChain_WaitHandleInfo *info = (ILibChain_WaitHandleInfo*)waitList[ILibChain_HandleInfoIndex(slct)]; + HANDLE h = waitList[slct]; + waitList[ILibChain_HandleInfoIndex(slct)] = NULL; + waitList[slct] = NULL; + if (info->handler != NULL) + { + if (info->handler(chain, h, ILibWaitHandle_ErrorStatus_NONE, info->user) == FALSE) + { + // FALSE means to remove tha HANDLE + if (ILibMemory_CanaryOK(info)) + { + ILibLinkedList_Remove(info->node); + } + } + } + } + } + if (slct == WAIT_TIMEOUT) + { + for (i = 0; i < waitListCount; ++i) + { + if (waitList[ILibChain_HandleInfoIndex(i)] != NULL && tvnonzero(&(((ILibChain_WaitHandleInfo*)waitList[ILibChain_HandleInfoIndex(i)])->expiration))) + { + if (tv2LTEtv1(¤tTime, &(((ILibChain_WaitHandleInfo*)waitList[ILibChain_HandleInfoIndex(i)])->expiration))) + { + // TIMEOUT occured + if (((ILibChain_WaitHandleInfo*)waitList[ILibChain_HandleInfoIndex(i)])->handler != NULL) + { + ((ILibChain_WaitHandleInfo*)waitList[ILibChain_HandleInfoIndex(i)])->handler(chain, waitList[i], ILibWaitHandle_ErrorStatus_TIMEOUT, ((ILibChain_WaitHandleInfo*)waitList[ILibChain_HandleInfoIndex(i)])->user); + } + ILibLinkedList_Remove(((ILibChain_WaitHandleInfo*)waitList[ILibChain_HandleInfoIndex(i)])->node); + waitList[i] = NULL; + waitList[ILibChain_HandleInfoIndex(i)] = NULL; + } + } + } + } + if (slct == WAIT_FAILED) + { + // One of the handles is invalid... Kick it out + for (i = 0; i < waitListCount; ++i) + { + if (waitList[ILibChain_HandleInfoIndex(i)] != NULL) + { + if (WaitForSingleObject(waitList[i], 0) == WAIT_FAILED) + { + if (((ILibChain_WaitHandleInfo*)waitList[ILibChain_HandleInfoIndex(i)])->handler != NULL) + { + ((ILibChain_WaitHandleInfo*)waitList[ILibChain_HandleInfoIndex(i)])->handler(chain, waitList[i], ILibWaitHandle_ErrorStatus_INVALID_HANDLE, ((ILibChain_WaitHandleInfo*)waitList[ILibChain_HandleInfoIndex(i)])->user); + } + ILibLinkedList_Remove(((ILibChain_WaitHandleInfo*)waitList[ILibChain_HandleInfoIndex(i)])->node); + waitList[i] = NULL; + waitList[ILibChain_HandleInfoIndex(i)] = NULL; + } + } + } + } + tv.tv_sec = 0; tv.tv_usec = 0; + slct = select(FD_SETSIZE, readset, writeset, errorset, &tv); + ((ILibBaseChain*)chain)->UnblockFlag = 0; + } + return(slct); +} +void ILibChain_SetupWindowsWaitObject(HANDLE* waitList, int *waitListCount, struct timeval *tv, DWORD *timeout, fd_set *readset, fd_set *writeset, fd_set *errorset, ILibLinkedList handleList) +{ + HANDLE selectHandles[FD_SETSIZE]; + memset(selectHandles, 0, sizeof(selectHandles)); + + if (readset->fd_count == 0 && writeset->fd_count == 0 && ILibLinkedList_GetNode_Head(handleList) == NULL) + { + *waitListCount = 0; + return; + } + + void *node; + struct timeval currentTime; + struct timeval expirationTime; + int i; + int x = 0; + long flags; + for (i = 0; i < (int)readset->fd_count; ++i) + { + selectHandles[x++] = (HANDLE)readset->fd_array[i]; + } + for (i = 0; i < (int)writeset->fd_count; ++i) + { + if (!FD_ISSET(writeset->fd_array[i], readset)) + { + selectHandles[x++] = (HANDLE)writeset->fd_array[i]; + } + } + for (i = 0; i < (int)errorset->fd_count; ++i) + { + if (!FD_ISSET(errorset->fd_array[i], readset) && !FD_ISSET(errorset->fd_array[i], writeset)) + { + selectHandles[x++] = (HANDLE)errorset->fd_array[i]; + } + } + for (i = 0; i < x; ++i) + { + if (waitList[i] == NULL || waitList[ILibChain_HandleInfoIndex(i)] != NULL) + { + waitList[i] = WSACreateEvent(); + } + else + { + WSAResetEvent(waitList[i]); + } + flags = 0; + waitList[ILibChain_HandleInfoIndex(i)] = NULL; + + if (FD_ISSET(selectHandles[i], readset)) { flags |= (FD_READ | FD_ACCEPT); } + if (FD_ISSET(selectHandles[i], writeset)) { flags |= (FD_WRITE | FD_CONNECT); } + if (FD_ISSET(selectHandles[i], errorset)) { flags |= FD_CLOSE; } + WSAEventSelect((SOCKET)selectHandles[i], waitList[i], flags); + } + ILibGetTimeOfDay(¤tTime); + memcpy_s(&expirationTime, sizeof(struct timeval), ¤tTime, sizeof(struct timeval)); + expirationTime.tv_sec += tv->tv_sec; + expirationTime.tv_usec += tv->tv_usec; + node = ILibLinkedList_GetNode_Head(handleList); + while (node != NULL) + { + i = x++; + if (waitList[i] != NULL && waitList[ILibChain_HandleInfoIndex(i)] == NULL) + { + WSACloseEvent(waitList[i]); + } + waitList[i] = (HANDLE)ILibLinkedList_GetDataFromNode(node); + waitList[ILibChain_HandleInfoIndex(i)] = (HANDLE)ILibMemory_Extra(node); + if (((ILibChain_WaitHandleInfo*)ILibMemory_Extra(node))->expiration.tv_sec != 0 || ((ILibChain_WaitHandleInfo*)ILibMemory_Extra(node))->expiration.tv_usec != 0) + { + // Timeout was specified + if (tv2LTtv1(&expirationTime, &(((ILibChain_WaitHandleInfo*)ILibMemory_Extra(node))->expiration))) + { + expirationTime.tv_sec = ((ILibChain_WaitHandleInfo*)ILibMemory_Extra(node))->expiration.tv_sec; + expirationTime.tv_usec = ((ILibChain_WaitHandleInfo*)ILibMemory_Extra(node))->expiration.tv_usec; + + // If the expiration happens in the past, we need to set the timeout to zero + if (tv2LTtv1(&expirationTime, ¤tTime)) { expirationTime.tv_sec = currentTime.tv_sec; expirationTime.tv_usec = currentTime.tv_usec; } + } + } + node = ILibLinkedList_GetNextNode(node); + } + expirationTime.tv_sec -= currentTime.tv_sec; if (expirationTime.tv_sec < 0) { expirationTime.tv_sec = 0; } + expirationTime.tv_usec -= currentTime.tv_usec; if (expirationTime.tv_usec < 0) { expirationTime.tv_usec = 0; } + *timeout = (DWORD)((expirationTime.tv_sec * 1000) + (expirationTime.tv_usec / 0.001)); + *waitListCount = x; +} +#endif + + ILibChain_ContinuationStates ILibChain_GetContinuationState(void *chain) { return(((ILibBaseChain*)chain)->continuationState); @@ -2090,15 +2265,11 @@ ILibExportMethod void ILibChain_Continue(void *Chain, ILibChain_Link **modules, // chain->PreSelectCount++; #ifdef WIN32 - if (readset.fd_count == 0 && writeset.fd_count == 0) - { - SleepEx((DWORD)chain->selectTimeout, TRUE); // If there is no pending IO, we must force the thread into an alertable wait state, so ILibForceUnblockChain can function. - slct = -1; - } - else - { - slct = select(FD_SETSIZE, &readset, &writeset, &errorset, &tv); - } + int x = 0; + DWORD waitTimeout = 0; + + ILibChain_SetupWindowsWaitObject(chain->WaitHandles, &x, &tv, &waitTimeout, &readset, &writeset, &errorset, chain->auxSelectHandles); + slct = ILibChain_WindowsSelect(chain, &readset, &writeset, &errorset, chain->WaitHandles, x, waitTimeout); #else slct = select(FD_SETSIZE, &readset, &writeset, &errorset, &tv); #endif @@ -2902,13 +3073,9 @@ ILibExportMethod void ILibStartChain(void *Chain) fd_set writeset; #ifdef WIN32 - void *node; - DWORD waitTimeout; HANDLE selectHandles[FD_SETSIZE]; memset(selectHandles, 0, sizeof(selectHandles)); memset(chain->WaitHandles, 0, sizeof(chain->WaitHandles)); - struct timeval currentTime; - struct timeval expirationTime; #endif struct timeval tv; int slct; @@ -3034,148 +3201,11 @@ ILibExportMethod void ILibStartChain(void *Chain) // chain->PreSelectCount++; #ifdef WIN32 - if (readset.fd_count == 0 && writeset.fd_count == 0 && ILibLinkedList_GetNode_Head(chain->auxSelectHandles)==NULL) - { - SleepEx((DWORD)chain->selectTimeout, TRUE); // If there is no pending IO, we must force the thread into an alertable wait state, so ILibForceUnblockChain can function. - slct = -1; - } - else - { - int i; - int x = 0; - long flags; - for (i = 0; i < (int)readset.fd_count; ++i) - { - selectHandles[x++] = (HANDLE)readset.fd_array[i]; - } - for (i = 0; i < (int)writeset.fd_count; ++i) - { - if (!FD_ISSET(writeset.fd_array[i], &readset)) - { - selectHandles[x++] = (HANDLE)writeset.fd_array[i]; - } - } - for (i = 0; i < (int)errorset.fd_count; ++i) - { - if (!FD_ISSET(errorset.fd_array[i], &readset) && !FD_ISSET(errorset.fd_array[i], &writeset)) - { - selectHandles[x++] = (HANDLE)errorset.fd_array[i]; - } - } - for (i = 0; i < x; ++i) - { - if (chain->WaitHandles[i] == NULL || chain->WaitHandles[ILibChain_HandleInfoIndex(i)] != NULL) - { - chain->WaitHandles[i] = WSACreateEvent(); - } - else - { - WSAResetEvent(chain->WaitHandles[i]); - } - flags = 0; - chain->WaitHandles[ILibChain_HandleInfoIndex(i)] = NULL; + int x = 0; + DWORD waitTimeout = 0; - if (FD_ISSET(selectHandles[i], &readset)) { flags |= (FD_READ | FD_ACCEPT); } - if (FD_ISSET(selectHandles[i], &writeset)) { flags |= (FD_WRITE | FD_CONNECT); } - if (FD_ISSET(selectHandles[i], &errorset)) { flags |= FD_CLOSE; } - WSAEventSelect((SOCKET)selectHandles[i], chain->WaitHandles[i], flags); - } - ILibGetTimeOfDay(¤tTime); - memcpy_s(&expirationTime, sizeof(struct timeval), ¤tTime, sizeof(struct timeval)); - expirationTime.tv_sec += tv.tv_sec; - expirationTime.tv_usec += tv.tv_usec; - node = ILibLinkedList_GetNode_Head(chain->auxSelectHandles); - while (node != NULL) - { - i = x++; - if (chain->WaitHandles[i] != NULL && chain->WaitHandles[ILibChain_HandleInfoIndex(i)] == NULL) - { - WSACloseEvent(chain->WaitHandles[i]); - } - chain->WaitHandles[i] = (HANDLE)ILibLinkedList_GetDataFromNode(node); - chain->WaitHandles[ILibChain_HandleInfoIndex(i)] = (HANDLE)ILibMemory_Extra(node); - if (((ILibChain_WaitHandleInfo*)ILibMemory_Extra(node))->expiration.tv_sec != 0 || ((ILibChain_WaitHandleInfo*)ILibMemory_Extra(node))->expiration.tv_usec != 0) - { - // Timeout was specified - if (tv2LTtv1(&expirationTime, &(((ILibChain_WaitHandleInfo*)ILibMemory_Extra(node))->expiration))) - { - expirationTime.tv_sec = ((ILibChain_WaitHandleInfo*)ILibMemory_Extra(node))->expiration.tv_sec; - expirationTime.tv_usec = ((ILibChain_WaitHandleInfo*)ILibMemory_Extra(node))->expiration.tv_usec; - - // If the expiration happens in the past, we need to set the timeout to zero - if (tv2LTtv1(&expirationTime, ¤tTime)) { expirationTime.tv_sec = currentTime.tv_sec; expirationTime.tv_usec = currentTime.tv_usec; } - } - } - node = ILibLinkedList_GetNextNode(node); - } - expirationTime.tv_sec -= currentTime.tv_sec; if (expirationTime.tv_sec < 0) { expirationTime.tv_sec = 0; } - expirationTime.tv_usec -= currentTime.tv_usec; if (expirationTime.tv_usec < 0) { expirationTime.tv_usec = 0; } - waitTimeout = (DWORD)((expirationTime.tv_sec * 1000) + (expirationTime.tv_usec / 0.001)); - - while ((slct = WaitForMultipleObjectsEx(x, chain->WaitHandles, FALSE, waitTimeout, TRUE)) == WAIT_IO_COMPLETION && chain->UnblockFlag == 0) {} - ILibGetTimeOfDay(¤tTime); - if (slct != WAIT_IO_COMPLETION && (slct - (int)WAIT_OBJECT_0 >= 0) && (slct - (int)WAIT_OBJECT_0 < x)) - { - if (chain->WaitHandles[ILibChain_HandleInfoIndex(slct)] != NULL) - { - ILibChain_WaitHandleInfo *info = (ILibChain_WaitHandleInfo*)chain->WaitHandles[ILibChain_HandleInfoIndex(slct)]; - HANDLE h = chain->WaitHandles[slct]; - chain->WaitHandles[ILibChain_HandleInfoIndex(slct)] = NULL; - chain->WaitHandles[slct] = NULL; - if (info->handler != NULL) - { - if (info->handler(chain, h, ILibWaitHandle_ErrorStatus_NONE, info->user) == FALSE) - { - // FALSE means to remove tha HANDLE - ILibLinkedList_Remove(info->node); - } - } - } - } - if (slct == WAIT_TIMEOUT) - { - for (i = 0; i < x; ++i) - { - if (chain->WaitHandles[ILibChain_HandleInfoIndex(i)] != NULL && tvnonzero(&(((ILibChain_WaitHandleInfo*)chain->WaitHandles[ILibChain_HandleInfoIndex(i)])->expiration))) - { - if (tv2LTEtv1(¤tTime, &(((ILibChain_WaitHandleInfo*)chain->WaitHandles[ILibChain_HandleInfoIndex(i)])->expiration))) - { - // TIMEOUT occured - if (((ILibChain_WaitHandleInfo*)chain->WaitHandles[ILibChain_HandleInfoIndex(i)])->handler != NULL) - { - ((ILibChain_WaitHandleInfo*)chain->WaitHandles[ILibChain_HandleInfoIndex(i)])->handler(chain, chain->WaitHandles[i], ILibWaitHandle_ErrorStatus_TIMEOUT, ((ILibChain_WaitHandleInfo*)chain->WaitHandles[ILibChain_HandleInfoIndex(i)])->user); - } - ILibLinkedList_Remove(((ILibChain_WaitHandleInfo*)chain->WaitHandles[ILibChain_HandleInfoIndex(i)])->node); - chain->WaitHandles[i] = NULL; - chain->WaitHandles[ILibChain_HandleInfoIndex(i)] = NULL; - } - } - } - } - if (slct == WAIT_FAILED) - { - // One of the handles is invalid... Kick it out - for (i = 0; i < x; ++i) - { - if (chain->WaitHandles[ILibChain_HandleInfoIndex(i)] != NULL) - { - if (WaitForSingleObject(chain->WaitHandles[i], 0) == WAIT_FAILED) - { - if (((ILibChain_WaitHandleInfo*)chain->WaitHandles[ILibChain_HandleInfoIndex(i)])->handler != NULL) - { - ((ILibChain_WaitHandleInfo*)chain->WaitHandles[ILibChain_HandleInfoIndex(i)])->handler(chain, chain->WaitHandles[i], ILibWaitHandle_ErrorStatus_INVALID_HANDLE, ((ILibChain_WaitHandleInfo*)chain->WaitHandles[ILibChain_HandleInfoIndex(i)])->user); - } - ILibLinkedList_Remove(((ILibChain_WaitHandleInfo*)chain->WaitHandles[ILibChain_HandleInfoIndex(i)])->node); - chain->WaitHandles[i] = NULL; - chain->WaitHandles[ILibChain_HandleInfoIndex(i)] = NULL; - } - } - } - } - tv.tv_sec = 0; tv.tv_usec = 0; - slct = select(FD_SETSIZE, &readset, &writeset, &errorset, &tv); - chain->UnblockFlag = 0; - } + ILibChain_SetupWindowsWaitObject(chain->WaitHandles, &x, &tv, &waitTimeout, &readset, &writeset, &errorset, chain->auxSelectHandles); + slct = ILibChain_WindowsSelect(chain, &readset, &writeset, &errorset, chain->WaitHandles, x, waitTimeout); #else slct = select(FD_SETSIZE, &readset, &writeset, &errorset, &tv); #endif @@ -7176,6 +7206,7 @@ void* ILibLinkedList_InsertAfter(void *LinkedList_Node, void *data) */ void* ILibLinkedList_Remove(void *LinkedList_Node) { + if (!ILibMemory_CanaryOK(LinkedList_Node)) { return(NULL); } struct ILibLinkedListNode_Root *r; struct ILibLinkedListNode *n; void* RetVal; diff --git a/microstack/ILibProcessPipe.c b/microstack/ILibProcessPipe.c index e507c00..f805442 100644 --- a/microstack/ILibProcessPipe.c +++ b/microstack/ILibProcessPipe.c @@ -87,7 +87,6 @@ typedef struct ILibProcessPipe_PipeObject ILibProcessPipe_GenericBrokenPipeHandler brokenPipeHandler; void *user1, *user2; #ifdef WIN32 - int usingCompletionRoutine; int cancelInProgress; HANDLE mPipe_Reader_ResumeEvent; HANDLE mPipe_ReadEnd; @@ -166,298 +165,7 @@ ILibProcessPipe_Pipe ILibProcessPipe_Process_GetStdOut(ILibProcessPipe_Process p } #ifdef WIN32 -BOOL ILibProcessPipe_Process_OnExit(HANDLE event, ILibWaitHandle_ErrorStatus errors, void* user); -typedef struct ILibProcessPipe_WaitHandle -{ - ILibProcessPipe_Manager_Object *parent; - HANDLE event; - void *user; - ILibProcessPipe_WaitHandle_Handler callback; - int timeRemaining; - int timeout; -}ILibProcessPipe_WaitHandle; -typedef struct ILibProcessPipe_WaitHandle_APC -{ - HANDLE callingThread; - HANDLE ev; - ILibWaitHandle_ErrorStatus status; - ILibProcessPipe_WaitHandle_Handler callback; - void *chain; - void *user; -}ILibProcessPipe_WaitHandle_APC; -HANDLE ILibProcessPipe_Manager_GetWorkerThread(ILibProcessPipe_Manager mgr) -{ - return(((ILibProcessPipe_Manager_Object*)mgr)->workerThread); -} - -int ILibProcessPipe_Manager_WindowsWaitHandles_Remove_Comparer(void *source, void *matchWith) -{ - if (source == NULL) { return 1; } - return(((ILibProcessPipe_WaitHandle*)source)->event == matchWith ? 0 : 1); -} - -void __stdcall ILibProcessPipe_WaitHandle_Remove_APC(ULONG_PTR obj) -{ - ILibProcessPipe_Manager_Object *manager = (ILibProcessPipe_Manager_Object*)((void**)obj)[0]; - HANDLE event = (HANDLE)((void**)obj)[1]; - void *node = ILibLinkedList_GetNode_Search(manager->ActivePipes, ILibProcessPipe_Manager_WindowsWaitHandles_Remove_Comparer, event); - ILibProcessPipe_WaitHandle *waiter; - - if (node != NULL) - { - waiter = (ILibProcessPipe_WaitHandle*)ILibLinkedList_GetDataFromNode(node); - if (waiter->callback != NULL) { waiter->callback(waiter->event, ILibWaitHandle_ErrorStatus_REMOVED, waiter->user); } - free(waiter); - ILibLinkedList_Remove(node); - SetEvent(manager->updateEvent); - } - free((void*)obj); -} -void ILibProcessPipe_WaitHandle_Remove(ILibProcessPipe_Manager mgr, HANDLE event) -{ - ILibProcessPipe_Manager_Object *manager = (ILibProcessPipe_Manager_Object*)mgr; - void **data = (void**)ILibMemory_Allocate(2 * sizeof(void*), 0, NULL, NULL); - data[0] = manager; - data[1] = event; - - QueueUserAPC((PAPCFUNC)ILibProcessPipe_WaitHandle_Remove_APC, manager->workerThread, (ULONG_PTR)data); -} -void __stdcall ILibProcessPipe_WaitHandle_Add_APC(ULONG_PTR obj) -{ - ILibProcessPipe_WaitHandle *waitHandle = (ILibProcessPipe_WaitHandle*)obj; - if (waitHandle->timeout > 0) { waitHandle->timeRemaining = waitHandle->timeout; } - ILibLinkedList_AddTail(waitHandle->parent->ActivePipes, waitHandle); -} -void ILibProcessPipe_WaitHandle_AddEx(ILibProcessPipe_Manager mgr, ILibProcessPipe_WaitHandle *waitHandle) -{ - ILibProcessPipe_Manager_Object *manager = (ILibProcessPipe_Manager_Object*)mgr; - if (manager->workerThreadID == GetCurrentThreadId()) - { - // We're on the same thread, so we can just add it in - if (waitHandle->timeout > 0) { waitHandle->timeRemaining = waitHandle->timeout; } - ILibLinkedList_AddTail(manager->ActivePipes, waitHandle); - SetEvent(manager->updateEvent); - } - else - { - // We're on a different thread, so we need to dispatch to the worker thread - QueueUserAPC((PAPCFUNC)ILibProcessPipe_WaitHandle_Add_APC, manager->workerThread, (ULONG_PTR)waitHandle); - } -} -void ILibProcessPipe_WaitHandle_Add_WithNonZeroTimeout(ILibProcessPipe_Manager mgr, HANDLE event, int milliseconds, void *user, ILibProcessPipe_WaitHandle_Handler callback) -{ - ILibProcessPipe_Manager_Object *manager = (ILibProcessPipe_Manager_Object*)mgr; - ILibProcessPipe_WaitHandle *waitHandle; - waitHandle = (ILibProcessPipe_WaitHandle*)ILibMemory_Allocate(sizeof(ILibProcessPipe_WaitHandle), 0, NULL, NULL); - - waitHandle->parent = manager; - waitHandle->event = event; - waitHandle->user = user; - waitHandle->callback = callback; - waitHandle->timeout = milliseconds; - - ILibProcessPipe_WaitHandle_AddEx(mgr, waitHandle); -} - -void ILibProcessPipe_Manager_WindowsRunLoopEx(void *arg) -{ - ILibProcessPipe_Manager_Object *manager = (ILibProcessPipe_Manager_Object*)arg; - HANDLE hList[FD_SETSIZE * (2*sizeof(HANDLE))]; - ILibLinkedList active = manager->ActivePipes; - uint64_t timestamp1; - DWORD elapsedTime = 0; - void* node; - ILibProcessPipe_WaitHandle* data; - - int i, jx; - DWORD x; - DWORD maxTimeout = INFINITE; - memset(hList, 0, sizeof(HANDLE)*FD_SETSIZE); - manager->workerThreadID = GetCurrentThreadId(); - - while (manager->abort == 0) - { - hList[0] = manager->updateEvent; - i = 1; - - //Prepare the rest of the WaitHandle Array, for the WaitForMultipleObject call - maxTimeout = INFINITE; - node = ILibLinkedList_GetNode_Head(active); - while (node != NULL) - { - if ((data = (ILibProcessPipe_WaitHandle*)ILibLinkedList_GetDataFromNode(node)) != NULL) - { - ILibRemoteLogging_printf(ILibChainGetLogger(manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "WindowsRunLoopEx(): [%p]", data); - - hList[i] = data->event; - hList[i + FD_SETSIZE] = (HANDLE)data; - if (data->timeout > 0) - { - if (elapsedTime > 0) - { - // We popped out of the wait, but didn't decrement timeRemaining yet - if (data->timeRemaining < (int)elapsedTime) - { - data->timeRemaining = 0; - } - else - { - data->timeRemaining -= (int)elapsedTime; - } - } - if ((DWORD)data->timeRemaining < maxTimeout) { maxTimeout = (DWORD)data->timeRemaining; } - } - ++i; - } - node = ILibLinkedList_GetNextNode(node); - } - - ILibRemoteLogging_printf(ILibChainGetLogger(manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "WindowsRunLoopEx(): Number of Handles: %d", i); - timestamp1 = (uint64_t)ILibGetUptime(); - - while ((x = WaitForMultipleObjectsEx(i, hList, FALSE, maxTimeout, TRUE)) != WAIT_IO_COMPLETION) - { - elapsedTime = (DWORD)((uint64_t)ILibGetUptime() - timestamp1); - if (x == WAIT_FAILED || x == WAIT_TIMEOUT || (x-WAIT_OBJECT_0) == 0) { break; } - data = (ILibProcessPipe_WaitHandle*)hList[x + FD_SETSIZE]; - manager->activeWaitHandle = data; - if (data != NULL && data->callback != NULL) - { - if (data->callback(data->event, ILibWaitHandle_ErrorStatus_NONE, data->user) == FALSE) - { - // FALSE means to remove the WaitHandle - void *node = ILibLinkedList_GetNode_Search(manager->ActivePipes, ILibProcessPipe_Manager_WindowsWaitHandles_Remove_Comparer, data->event); - if (node != NULL) - { - free(ILibLinkedList_GetDataFromNode(node)); - ILibLinkedList_Remove(node); - break; - } - } - } - - if (maxTimeout != INFINITE) - { - // Need to recalculate maxTimeout - for (jx = 1; jx < i; ++jx) - { - data = (ILibProcessPipe_WaitHandle*)hList[jx + FD_SETSIZE]; - if (data->timeout > 0) // Timeout was specified - { - if (data->timeRemaining > (int)elapsedTime) // We can just decrement time left - { - data->timeRemaining -= (int)elapsedTime; - } - else - { - data->timeRemaining = 0; // Already expired - } - if (data->timeRemaining < (int)maxTimeout) - { - maxTimeout = (DWORD)data->timeRemaining; // Set the new maxTimeout - } - } - } - } - } - if (x == WAIT_IO_COMPLETION) { elapsedTime = (DWORD)((uint64_t)ILibGetUptime() - timestamp1); } - if (x == WAIT_FAILED) - { - for (jx = 1; jx < i; ++jx) - { - if (WaitForSingleObject(hList[jx], 0) == WAIT_FAILED) - { - // This handle is invalid, so let's error it out and remove it - node = ILibLinkedList_GetNode_Search(manager->ActivePipes, ILibProcessPipe_Manager_WindowsWaitHandles_Remove_Comparer, hList[jx]); - if (node != NULL) - { - // Propagate an Error up - data = (ILibProcessPipe_WaitHandle*)hList[jx + FD_SETSIZE]; - if (data != NULL && data->callback != NULL) - { - manager->activeWaitHandle = data; - data->callback(data->event, ILibWaitHandle_ErrorStatus_INVALID_HANDLE, data->user); - } - - // Remove the wait handle - free(ILibLinkedList_GetDataFromNode(node)); - ILibLinkedList_Remove(node); - break; - } - } - } - } - if (x == WAIT_TIMEOUT) - { - for (jx = 1; jx < i; ++jx) - { - data = (ILibProcessPipe_WaitHandle*)hList[jx + FD_SETSIZE]; - if (data->timeout > 0) - { - // A timeout was specified, so lets check it - if (data->timeRemaining <= (int)elapsedTime) - { - data->timeRemaining = 0; - } - else - { - data->timeRemaining -= (int)elapsedTime; - } - if (data->timeRemaining == 0) - { - manager->activeWaitHandle = data; - if (data->callback == NULL || data->callback(data->event, ILibWaitHandle_ErrorStatus_TIMEOUT, data->user) == FALSE) - { - // Remove Wait Handle - node = ILibLinkedList_GetNode_Search(manager->ActivePipes, ILibProcessPipe_Manager_WindowsWaitHandles_Remove_Comparer, data->event); - if (node != NULL) - { - free(ILibLinkedList_GetDataFromNode(node)); - ILibLinkedList_Remove(node); - } - } - else - { - // Reset timeout - data->timeRemaining = data->timeout; - } - } - } - } - elapsedTime = 0; // Set this to zero, becuase we already decremented everyone's time remaining - } - ResetEvent(manager->updateEvent); - } - - // - // We need to enumerate the list of handles, and free them, because now that this thread - // is not in an alertable state, the parent will not be able to Queue any actions for cleanup - // - while ((node = ILibLinkedList_GetNode_Head(manager->ActivePipes)) != NULL) - { - free(ILibLinkedList_GetDataFromNode(node)); - ILibLinkedList_Remove(node); - } -} -ILibProcessPipe_Manager_WindowsRunLoop(void *arg) -{ - ILib_DumpEnabledContext winException; - __try - { - ILibProcessPipe_Manager_WindowsRunLoopEx(arg); - } - __except (ILib_WindowsExceptionFilterEx(GetExceptionCode(), GetExceptionInformation(), &winException)) - { - ILib_WindowsExceptionDebugEx(&winException); - } -} -void ILibProcessPipe_Manager_Start(void* chain, void* user) -{ - ILibProcessPipe_Manager_Object* man = (ILibProcessPipe_Manager_Object*)user; - - UNREFERENCED_PARAMETER(chain); - man->workerThread = ILibSpawnNormalThread((voidfp1)&ILibProcessPipe_Manager_WindowsRunLoop, user); -} +BOOL ILibProcessPipe_Process_OnExit(void *chain, HANDLE event, ILibWaitHandle_ErrorStatus errors, void* user); #else void ILibProcessPipe_Process_ReadHandler(void* user); void ILibProcessPipe_Manager_OnPreSelect(void* object, fd_set *readset, fd_set *writeset, fd_set *errorset, int* blocktime) @@ -526,10 +234,7 @@ ILibProcessPipe_Manager ILibProcessPipe_Manager_Create(void *chain) retVal->ChainLink.ParentChain = chain; retVal->ActivePipes = ILibLinkedList_CreateEx(sizeof(int)); -#ifdef WIN32 - retVal->updateEvent = CreateEvent(NULL, TRUE, FALSE, NULL); - ILibChain_RunOnMicrostackThread(chain, ILibProcessPipe_Manager_Start, retVal); -#else +#ifndef WIN32 retVal->ChainLink.PreSelectHandler = &ILibProcessPipe_Manager_OnPreSelect; retVal->ChainLink.PostSelectHandler = &ILibProcessPipe_Manager_OnPostSelect; #endif @@ -558,10 +263,22 @@ void ILibProcessPipe_FreePipe(ILibProcessPipe_PipeObject *pipeObject) { CloseHandle(pipeObject->mPipe_WriteEnd); } - if (pipeObject->mOverlapped != NULL && pipeObject->usingCompletionRoutine == 0) { CloseHandle(pipeObject->mOverlapped->hEvent); free(pipeObject->mOverlapped); } - if (pipeObject->mwOverlapped != NULL) { free(pipeObject->mwOverlapped); } + if (pipeObject->mOverlapped != NULL) + { + ILibChain_RemoveWaitHandle(pipeObject->manager->ChainLink.ParentChain, pipeObject->mOverlapped->hEvent); + CloseHandle(pipeObject->mOverlapped->hEvent); free(pipeObject->mOverlapped); + } + if (pipeObject->mwOverlapped != NULL) + { + if (pipeObject->mwOverlapped->hEvent != NULL) + { + ILibChain_RemoveWaitHandle(pipeObject->manager->ChainLink.ParentChain, pipeObject->mwOverlapped->hEvent); + CloseHandle(pipeObject->mwOverlapped->hEvent); + } + free(pipeObject->mwOverlapped); + } if (pipeObject->mPipe_Reader_ResumeEvent != NULL) { CloseHandle(pipeObject->mPipe_Reader_ResumeEvent); } - if (pipeObject->buffer != NULL && pipeObject->usingCompletionRoutine == 0) { free(pipeObject->buffer); } + if (pipeObject->buffer != NULL) { free(pipeObject->buffer); } #else if (pipeObject->manager != NULL) { @@ -591,11 +308,7 @@ void ILibProcessPipe_FreePipe(ILibProcessPipe_PipeObject *pipeObject) if (pipeObject->mProcess->stdOut == pipeObject) { pipeObject->mProcess->stdOut = NULL; } if (pipeObject->mProcess->stdErr == pipeObject) { pipeObject->mProcess->stdErr = NULL; } } -#ifdef WIN32 - if (pipeObject->usingCompletionRoutine == 0) { ILibMemory_Free(pipeObject); } -#else ILibMemory_Free(pipeObject); -#endif } #ifdef WIN32 @@ -686,48 +399,15 @@ ILibProcessPipe_PipeObject* ILibProcessPipe_CreatePipe(ILibProcessPipe_Manager m return retVal; } -#ifdef WIN32 - -typedef struct ILibProcessPipe_Process_Destroy_WinRunThread_Data -{ - ILibProcessPipe_Process_Object *pj; - HANDLE h; -}ILibProcessPipe_Process_Destroy_WinRunThread_Data; -void __stdcall ILibProcessPipe_Process_Destroy_WinRunThread(ULONG_PTR obj) -{ - ILibProcessPipe_Process_Destroy_WinRunThread_Data *data = (ILibProcessPipe_Process_Destroy_WinRunThread_Data*)obj; - if (ILibMemory_CanaryOK(data) && ILibMemory_CanaryOK(data->pj)) - { - if (data->pj->exiting == 0) - { - if (ILibMemory_CanaryOK(data) && ILibMemory_CanaryOK(data->pj) && data->pj->stdIn != NULL) { ILibProcessPipe_FreePipe(data->pj->stdIn); } - if (ILibMemory_CanaryOK(data) && ILibMemory_CanaryOK(data->pj) && data->pj->stdOut != NULL) { ILibProcessPipe_FreePipe(data->pj->stdOut); } - if (ILibMemory_CanaryOK(data) && ILibMemory_CanaryOK(data->pj) && data->pj->stdErr != NULL) { ILibProcessPipe_FreePipe(data->pj->stdErr); } - if (ILibMemory_CanaryOK(data) && ILibMemory_CanaryOK(data->pj)) { ILibMemory_Free(data->pj); } - } - } - SetEvent(data->h); -} -#endif void ILibProcessPipe_Process_Destroy(ILibProcessPipe_Process_Object *p) { if (!ILibMemory_CanaryOK(p)) { return; } -#ifdef WIN32 - ILibProcessPipe_Process_Destroy_WinRunThread_Data *data = ILibMemory_AllocateA(sizeof(ILibProcessPipe_Process_Destroy_WinRunThread_Data)); - data->pj = p; - data->h = CreateEvent(NULL, TRUE, FALSE, NULL); - // We can't destroy this now, because we're on the MicrostackThread. We must destroy this on the WindowsRunLoop Thread. - QueueUserAPC((PAPCFUNC)ILibProcessPipe_Process_Destroy_WinRunThread, p->parent->workerThread, (ULONG_PTR)data); - WaitForSingleObjectEx(data->h, 3000, TRUE); - CloseHandle(data->h); -#else if (p->exiting != 0) { return; } if (p->stdIn != NULL) { ILibProcessPipe_FreePipe(p->stdIn); } if (p->stdOut != NULL) { ILibProcessPipe_FreePipe(p->stdOut); } if (p->stdErr != NULL) { ILibProcessPipe_FreePipe(p->stdErr); } ILibMemory_Free(p); -#endif } #ifndef WIN32 void ILibProcessPipe_Process_BrokenPipeSink_DestroyHandler(void *object) @@ -1073,7 +753,7 @@ void ILibProcessPipe_Pipe_SwapBuffers(ILibProcessPipe_Pipe obj, char* newBuffer, } #ifdef WIN32 -BOOL ILibProcessPipe_Process_ReadHandler(HANDLE event, ILibWaitHandle_ErrorStatus errors, void* user) +BOOL ILibProcessPipe_Process_ReadHandler(void *chain, HANDLE event, ILibWaitHandle_ErrorStatus errors, void* user) #else void ILibProcessPipe_Process_ReadHandler(void* user) #endif @@ -1198,10 +878,7 @@ void ILibProcessPipe_Process_ReadHandler(void* user) // Broken Pipe // ILibRemoteLogging_printf(ILibChainGetLogger(pipeObject->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Pipe, ILibRemoteLogging_Flags_VerbosityLevel_1, "ILibProcessPipe[ReadHandler]: BrokenPipe(%d) on Pipe: %p", err, (void*)pipeObject); -#ifdef WIN32 - ILibProcessPipe_WaitHandle_Remove(pipeObject->manager, pipeObject->mOverlapped->hEvent); // Pipe Broken, so remove ourselves from the processing loop - ILibLinkedList_Remove(ILibLinkedList_GetNode_Search(pipeObject->manager->ActivePipes, NULL, pipeObject)); -#else +#ifndef WIN32 void *pipenode = ILibLinkedList_GetNode_Search(pipeObject->manager->ActivePipes, NULL, pipeObject); if (pipenode != NULL) { @@ -1233,7 +910,7 @@ void ILibProcessPipe_Process_ReadHandler(void* user) #endif } #ifdef WIN32 -BOOL ILibProcessPipe_Process_WindowsWriteHandler(HANDLE event, ILibWaitHandle_ErrorStatus errors, void* user) +BOOL ILibProcessPipe_Process_WindowsWriteHandler(void *chain, HANDLE event, ILibWaitHandle_ErrorStatus errors, void* user) { if (errors != ILibWaitHandle_ErrorStatus_NONE) { return(FALSE); } ILibProcessPipe_PipeObject *pipeObject = (ILibProcessPipe_PipeObject*)user; @@ -1246,11 +923,11 @@ BOOL ILibProcessPipe_Process_WindowsWriteHandler(HANDLE event, ILibWaitHandle_Er if (result == FALSE) { // Broken Pipe - ILibProcessPipe_WaitHandle_Remove(pipeObject->manager, pipeObject->mOverlapped->hEvent); // Pipe Broken, so remove ourselves from the processing loop + ILibChain_RemoveWaitHandle(pipeObject->manager->ChainLink.ParentChain, pipeObject->mOverlapped->hEvent); ILibRemoteLogging_printf(ILibChainGetLogger(pipeObject->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Pipe, ILibRemoteLogging_Flags_VerbosityLevel_1, "ILibProcessPipe[WriteHandler]: BrokenPipe(%d) on Pipe: %p", GetLastError(), (void*)pipeObject); if (pipeObject->brokenPipeHandler != NULL) { ((ILibProcessPipe_GenericBrokenPipeHandler)pipeObject->brokenPipeHandler)(pipeObject); } ILibProcessPipe_FreePipe(pipeObject); - return(FALSE); + return(TRUE); } ILibQueue_Lock(pipeObject->WriteBuffer); @@ -1267,16 +944,17 @@ BOOL ILibProcessPipe_Process_WindowsWriteHandler(HANDLE event, ILibWaitHandle_Er // Broken Pipe ILibQueue_UnLock(pipeObject->WriteBuffer); ILibRemoteLogging_printf(ILibChainGetLogger(pipeObject->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Pipe, ILibRemoteLogging_Flags_VerbosityLevel_1, "ILibProcessPipe[WriteHandler]: BrokenPipe(%d) on Pipe: %p", GetLastError(), (void*)pipeObject); + ILibChain_RemoveWaitHandle(pipeObject->manager->ChainLink.ParentChain, pipeObject->mOverlapped->hEvent); if (pipeObject->brokenPipeHandler != NULL) { ((ILibProcessPipe_GenericBrokenPipeHandler)pipeObject->brokenPipeHandler)(pipeObject); } ILibProcessPipe_FreePipe(pipeObject); - return(FALSE); + return(TRUE); } break; } } if (ILibQueue_IsEmpty(pipeObject->WriteBuffer) != 0) { - ILibProcessPipe_WaitHandle_Remove(pipeObject->manager, pipeObject->mOverlapped->hEvent); + ILibChain_RemoveWaitHandle(pipeObject->manager->ChainLink.ParentChain, pipeObject->mOverlapped->hEvent); ILibQueue_UnLock(pipeObject->WriteBuffer); if (pipeObject->handler != NULL) ((ILibProcessPipe_GenericSendOKHandler)pipeObject->handler)(pipeObject->user1, pipeObject->user2); } @@ -1314,51 +992,7 @@ void ILibProcessPipe_Pipe_Pause(ILibProcessPipe_Pipe pipeObject) else { ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.Pause(): Opaque = %p",(void*)p->mOverlapped_opaqueData); - if (p->manager->workerThreadID == GetCurrentThreadId()) - { - // Already on the dispatch thread, so we can directly update the list - if (p->mOverlapped_opaqueData == NULL) - { - void *node = ILibLinkedList_GetNode_Search(p->manager->ActivePipes, NULL, p->manager->activeWaitHandle); - if (node != NULL) - { - ILibLinkedList_Remove(node); - p->mOverlapped_opaqueData = p->manager->activeWaitHandle; - ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "...Opaque = %p", (void*)p->mOverlapped_opaqueData); - SetEvent(p->manager->updateEvent); // Just need to set event, so the wait list will be rebuilt - } - else - { - ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.Pause(): Active pipe not in list"); - } - } - else - { - ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.Pause(): Already paused"); - } - } - else - { - // Not on the dispatch thread, so we'll have to dispatch - p->mOverlapped_opaqueData = NULL; - ILibProcessPipe_WaitHandle_Remove(p->manager, p->mOverlapped->hEvent); - } - - //if (p->mOverlapped_opaqueData == NULL) - //{ - // void *node = ILibLinkedList_GetNode_Search(p->manager->ActivePipes, NULL, p->manager->activeWaitHandle); - // if (node != NULL) - // { - // ILibLinkedList_Remove(node); - // p->mOverlapped_opaqueData = p->manager->activeWaitHandle; - // ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "...Opaque = %p", (void*)p->mOverlapped_opaqueData); - // SetEvent(p->manager->updateEvent); - // } - // else - // { - // ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.Pause(): Active pipe not in list"); - // } - //} + ILibChain_RemoveWaitHandle(p->manager->ChainLink.ParentChain, p->mOverlapped->hEvent); } #else ILibLinkedList_Remove(ILibLinkedList_GetNode_Search(p->manager->ActivePipes, NULL, pipeObject)); @@ -1411,63 +1045,15 @@ void ILibProcessPipe_Pipe_ResumeEx_ContinueProcessing(ILibProcessPipe_PipeObject } p->processingLoop = 0; } -#ifdef WIN32 -void __stdcall ILibProcessPipe_Pipe_ResumeEx_APC(ULONG_PTR obj) -{ - ILibProcessPipe_PipeObject *p = (ILibProcessPipe_PipeObject*)obj; - - ILibProcessPipe_Pipe_ResumeEx_ContinueProcessing(p); - if (p->PAUSED == 0) - { - ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.ResumeEx_APC(): Adding Pipe [%p]", (void*)p); - if (p->mOverlapped_opaqueData != NULL) - { - ILibProcessPipe_WaitHandle_AddEx(p->manager, (ILibProcessPipe_WaitHandle*)p->mOverlapped_opaqueData); - p->mOverlapped_opaqueData = NULL; - } - else - { - ILibProcessPipe_WaitHandle_Add(p->manager, p->mOverlapped->hEvent, p, ILibProcessPipe_Process_ReadHandler); - } - //printf("ReadFile(%p, %d, %d) (ResumeEx_APC)\n", p->mPipe_ReadEnd, p->readOffset + p->totalRead, p->bufferSize - p->readOffset - p->totalRead); - if (p->inProgress == 0) - { - p->inProgress = 1; - ReadFile(p->mPipe_ReadEnd, p->buffer + p->readOffset + p->totalRead, p->bufferSize - p->readOffset - p->totalRead, NULL, p->mOverlapped); - } - } - else - { - ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.ResumeEx_APC(): Skipping Pipe [%p]", (void*)p); - } -} -#endif void ILibProcessPipe_Pipe_ResumeEx(ILibProcessPipe_PipeObject* p) { if (!ILibMemory_CanaryOK(p)) { return; } ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.ResumeEx(): processingLoop = %d", p->processingLoop); #ifdef WIN32 - if (p->manager->workerThreadID == GetCurrentThreadId()) - { - // We were called from the event dispatch (ie: we're on the same thread) - if (p->mOverlapped_opaqueData != NULL) - { - ILibProcessPipe_WaitHandle_AddEx(p->manager, (ILibProcessPipe_WaitHandle*)p->mOverlapped_opaqueData); - p->mOverlapped_opaqueData = NULL; - } - else - { - ILibProcessPipe_WaitHandle_Add(p->manager, p->mOverlapped->hEvent, p, ILibProcessPipe_Process_ReadHandler); - } - p->PAUSED = 0; - } - else - { - // We're on a different thread, so we need to dispatch to the worker thread - QueueUserAPC((PAPCFUNC)ILibProcessPipe_Pipe_ResumeEx_APC, p->manager->workerThread, (ULONG_PTR)p); - } + ILibChain_AddWaitHandle(p->manager->ChainLink.ParentChain, p->mOverlapped->hEvent, -1, ILibProcessPipe_Process_ReadHandler, p); + p->PAUSED = 0; return; #endif @@ -1492,7 +1078,7 @@ void ILibProcessPipe_Pipe_Resume(ILibProcessPipe_Pipe pipeObject) if (p->mProcess != NULL && p->mProcess->hProcess_needAdd != 0 && p->mProcess->disabled == 0) { p->mProcess->hProcess_needAdd = 0; - ILibProcessPipe_WaitHandle_Add(p->manager, p->mProcess->hProcess, p->mProcess, ILibProcessPipe_Process_OnExit); + ILibChain_AddWaitHandle(p->manager->ChainLink.ParentChain, p->mProcess->hProcess, -1, ILibProcessPipe_Process_OnExit, p->mProcess); } } #else @@ -1576,7 +1162,7 @@ void ILibProcessPipe_Process_StartPipeReader(ILibProcessPipe_PipeObject *pipeObj //printf("ReadFile(%p, %d, %d) (StartPipeReader)\n", pipeObject->mPipe_ReadEnd, 0, pipeObject->bufferSize); pipeObject->inProgress = 1; result = ReadFile(pipeObject->mPipe_ReadEnd, pipeObject->buffer, pipeObject->bufferSize, NULL, pipeObject->mOverlapped); - ILibProcessPipe_WaitHandle_Add(pipeObject->manager, pipeObject->mOverlapped->hEvent, pipeObject, &ILibProcessPipe_Process_ReadHandler); + ILibChain_AddWaitHandle(pipeObject->manager->ChainLink.ParentChain, pipeObject->mOverlapped->hEvent, -1, ILibProcessPipe_Process_ReadHandler, pipeObject); } else { @@ -1605,11 +1191,6 @@ void ILibProcessPipe_Process_PipeHandler_StdIn(void *user1, void *user2) } #ifdef WIN32 -void __stdcall ILibProcessPipe_Process_OnExit_ChainSink_DestroySink(ULONG_PTR obj) -{ - ILibProcessPipe_Process_Object* j = (ILibProcessPipe_Process_Object*)obj; - if (j->exiting == 0) { ILibProcessPipe_Process_Destroy(j); } -} void ILibProcessPipe_Process_OnExit_ChainSink(void *chain, void *user) { ILibProcessPipe_Process_Object* j = (ILibProcessPipe_Process_Object*)user; @@ -1617,22 +1198,12 @@ void ILibProcessPipe_Process_OnExit_ChainSink(void *chain, void *user) BOOL result; if (j->disabled != 0) { return; } - if (ILibChain_SelectInterrupted(j->chain) != 0) - { - // Winsock appears to be at a select call, so this must've been triggered by an APC, so we must unroll the callstack to continue, - // because winsock is not re-entrant, so we cannot risk making another winsock call directly. - // - ILibChain_RunOnMicrostackThreadEx2(j->chain, ILibProcessPipe_Process_OnExit_ChainSink, user, 0); - return; - } - result = GetExitCodeProcess(j->hProcess, &exitCode); j->exiting = 1; j->exitHandler(j, exitCode, j->userObject); j->exiting ^= 1; - // We can't destroy this now, because we're on the MicrostackThread. We must destroy this on the WindowsRunLoop Thread. - QueueUserAPC((PAPCFUNC)ILibProcessPipe_Process_OnExit_ChainSink_DestroySink, j->parent->workerThread, (ULONG_PTR)j); + if (j->exiting == 0) { ILibProcessPipe_Process_Destroy(j); } } #ifdef WIN32 void __stdcall ILibProcessPipe_Process_OnExit_ChainSink_APC(ULONG_PTR obj) @@ -1640,13 +1211,13 @@ void __stdcall ILibProcessPipe_Process_OnExit_ChainSink_APC(ULONG_PTR obj) ILibProcessPipe_Process_OnExit_ChainSink(NULL, (void*)obj); } #endif -BOOL ILibProcessPipe_Process_OnExit(HANDLE event, ILibWaitHandle_ErrorStatus errors, void* user) +BOOL ILibProcessPipe_Process_OnExit(void *chain, HANDLE event, ILibWaitHandle_ErrorStatus errors, void* user) { ILibProcessPipe_Process_Object* j = (ILibProcessPipe_Process_Object*)user; if (errors != ILibWaitHandle_ErrorStatus_NONE) { return(FALSE); } UNREFERENCED_PARAMETER(event); - ILibProcessPipe_WaitHandle_Remove(j->parent, j->hProcess); + ILibChain_RemoveWaitHandle(j->chain, j->hProcess); if ((j->stdOut->PAUSED != 0 && j->stdOut->totalRead > 0) || (j->stdErr->PAUSED != 0 && j->stdErr->totalRead > 0)) { @@ -1656,19 +1227,14 @@ BOOL ILibProcessPipe_Process_OnExit(HANDLE event, ILibWaitHandle_ErrorStatus err { if (j->exitHandler != NULL) { - // Everyone's lives will be made easier, by context switching to chain thread before making this call -#ifdef WIN32 - QueueUserAPC((PAPCFUNC)ILibProcessPipe_Process_OnExit_ChainSink_APC, ILibChain_GetMicrostackThreadHandle(j->parent->ChainLink.ParentChain), (ULONG_PTR)user); -#else - ILibChain_RunOnMicrostackThread(j->parent->ChainLink.ParentChain, ILibProcessPipe_Process_OnExit_ChainSink, user); -#endif + ILibProcessPipe_Process_OnExit_ChainSink(j->chain, user); } else { ILibProcessPipe_Process_Destroy(j); } } - return(FALSE); + return(TRUE); } #endif void ILibProcessPipe_Process_UpdateUserObject(ILibProcessPipe_Process module, void *userObj) @@ -1682,7 +1248,7 @@ void ILibProcessPipe_Process_RemoveHandlers(ILibProcessPipe_Process module) if (j != NULL && ILibMemory_CanaryOK(j)) { j->disabled = 1; - ILibProcessPipe_WaitHandle_Remove(j->parent, j->hProcess); + ILibChain_RemoveWaitHandle(j->chain, j->hProcess); } } #endif @@ -1699,7 +1265,7 @@ void ILibProcessPipe_Process_AddHandlers(ILibProcessPipe_Process module, int buf ILibProcessPipe_Process_SetWriteHandler(j->stdIn, &ILibProcessPipe_Process_PipeHandler_StdIn, j, sendOk); #ifdef WIN32 - ILibProcessPipe_WaitHandle_Add(j->parent, j->hProcess, j, &ILibProcessPipe_Process_OnExit); + ILibChain_AddWaitHandle(j->parent->ChainLink.ParentChain, j->hProcess, -1, ILibProcessPipe_Process_OnExit, j); #endif } } @@ -1758,7 +1324,7 @@ ILibTransport_DoneState ILibProcessPipe_Pipe_Write(ILibProcessPipe_Pipe po, char retVal = ILibTransport_DoneState_INCOMPLETE; ILibQueue_EnQueue(pipeObject->WriteBuffer, ILibProcessPipe_WriteData_Create(buffer, bufferLen, ownership)); #ifdef WIN32 - ILibProcessPipe_WaitHandle_Add(pipeObject->manager, pipeObject->mOverlapped->hEvent, pipeObject, &ILibProcessPipe_Process_WindowsWriteHandler); + ILibChain_AddWaitHandle(pipeObject->manager->ChainLink.ParentChain, pipeObject->mOverlapped->hEvent, -1, ILibProcessPipe_Process_WindowsWriteHandler, pipeObject); #else ILibLifeTime_Add(ILibGetBaseTimer(pipeObject->manager->ChainLink.ParentChain), pipeObject, 0, &ILibProcessPipe_Process_StartPipeReaderWriterEx, NULL); // Need to context switch to Chain Thread #endif @@ -1779,7 +1345,7 @@ ILibTransport_DoneState ILibProcessPipe_Pipe_Write(ILibProcessPipe_Pipe po, char #ifdef WIN32 if (pipeObject->manager != NULL) { - ILibProcessPipe_WaitHandle_Remove(pipeObject->manager, pipeObject->mOverlapped->hEvent); // Pipe Broken, so remove ourselves from the processing loop + ILibChain_RemoveWaitHandle(pipeObject->manager->ChainLink.ParentChain, pipeObject->mOverlapped->hEvent); } #endif ((ILibProcessPipe_GenericBrokenPipeHandler)pipeObject->brokenPipeHandler)(pipeObject); @@ -1895,7 +1461,6 @@ int ILibProcessPipe_Pipe_ReadEx(ILibProcessPipe_Pipe targetPipe, char *buffer, i { if (GetLastError() == ERROR_IO_PENDING) { - j->usingCompletionRoutine = 1; j->buffer = buffer; j->bufferSize = bufferLength; j->user1 = user; diff --git a/microstack/ILibProcessPipe.h b/microstack/ILibProcessPipe.h index 8d4f3a8..ce7bccd 100644 --- a/microstack/ILibProcessPipe.h +++ b/microstack/ILibProcessPipe.h @@ -100,15 +100,5 @@ pid_t ILibProcessPipe_Process_GetPID(ILibProcessPipe_Process p); int ILibProcessPipe_Process_GetPTY(ILibProcessPipe_Process p); #endif - -#ifdef WIN32 -typedef BOOL(*ILibProcessPipe_WaitHandle_Handler)(HANDLE event, ILibWaitHandle_ErrorStatus status, void* user); -void ILibProcessPipe_WaitHandle_Remove(ILibProcessPipe_Manager mgr, HANDLE event); - -// These methods will dispatch the callback on the worker thread -void ILibProcessPipe_WaitHandle_Add_WithNonZeroTimeout(ILibProcessPipe_Manager mgr, HANDLE event, int milliseconds, void *user, ILibProcessPipe_WaitHandle_Handler callback); -#define ILibProcessPipe_WaitHandle_Add(processPipeManager, eventHandle, user, callback) ILibProcessPipe_WaitHandle_Add_WithNonZeroTimeout(processPipeManager, eventHandle, 0, user, callback) - -#endif #define ILibTransports_ProcessPipe 0x60 #endif