diff --git a/microscript/ILibDuktape_HECI.c b/microscript/ILibDuktape_HECI.c index 0c0236f..e72decb 100644 --- a/microscript/ILibDuktape_HECI.c +++ b/microscript/ILibDuktape_HECI.c @@ -451,7 +451,8 @@ ILibTransport_DoneState ILibDuktape_HECI_Session_WriteSink(ILibDuktape_DuplexStr { #if defined(WIN32) state->returnIgnored = 1; - QueueUserAPC((PAPCFUNC)ILibDuktape_HECI_Session_WriteHandler, ILibProcessPipe_Manager_GetWorkerThread(session->mgr), (ULONG_PTR)state); + ILibDuktape_HECI_Session_WriteHandler((ULONG_PTR)state); + //QueueUserAPC((PAPCFUNC)ILibDuktape_HECI_Session_WriteHandler, ILibProcessPipe_Manager_GetWorkerThread(session->mgr), (ULONG_PTR)state); #elif defined(_POSIX) if (ILibIsRunningOnChainThread(stream->readableStream->chain) != 0) { @@ -534,12 +535,15 @@ void ILibDuktape_HECI_Session_ResumeSink(ILibDuktape_DuplexStream *sender, void if (session->noPipelining != 0) { ILibChain_RunOnMicrostackThread(sender->readableStream->chain, ILibDuktape_HECI_Session_ResumeSink_NoPipeline, session); - // Note: DO NOT 'return' here, because we still need to QueueUserAPC, to resume the stream on Windows + // Note: DO NOT 'return' here, because we still need to resume the stream on Windows } #ifdef WIN32 - // To Resume, we need to first context switch to the Windows Thread - QueueUserAPC((PAPCFUNC)ILibDuktape_HECI_Session_ResumeSink2, ILibProcessPipe_Manager_GetWorkerThread(session->mgr), (ULONG_PTR)session); + BOOL result = ReadFile(session->descriptor, session->buffer, (DWORD)session->bufferSize, &(session->bytesRead), &(session->v)); + if (result == TRUE || GetLastError() == ERROR_IO_PENDING) + { + ILibProcessPipe_WaitHandle_Add(session->mgr, session->v.hEvent, session, ILibDuktape_HECI_Session_ReceiveSink); + } #endif } #ifdef WIN32 @@ -645,7 +649,8 @@ duk_ret_t ILibDuktape_HECI_create_OnClientConnect(duk_context *ctx) duk_get_prop_string(ctx, -1, ILibDuktape_HECI_ChildProcess); // [HECI][childProcess] duk_get_prop_string(ctx, -1, ILibDuktape_ChildProcess_Manager); // [HECI][childProcess][manager] session->mgr = (ILibProcessPipe_Manager)duk_get_pointer(ctx, -1); - QueueUserAPC((PAPCFUNC)ILibDuktape_HECI_Session_Start, ILibProcessPipe_Manager_GetWorkerThread(session->mgr), (ULONG_PTR)session); + ILibDuktape_HECI_Session_Start((ULONG_PTR)session); + //QueueUserAPC((PAPCFUNC)ILibDuktape_HECI_Session_Start, ILibProcessPipe_Manager_GetWorkerThread(session->mgr), (ULONG_PTR)session); #else duk_push_this(ctx); // [HECI] session->descriptor = Duktape_GetIntPropertyValue(ctx, -1, ILibDuktape_HECI_Descriptor, -1); @@ -727,7 +732,8 @@ duk_ret_t ILibDuktape_HECI_Session_close(duk_context *ctx) ILibProcessPipe_WaitHandle_Remove(session->mgr, session->v.hEvent); ILibProcessPipe_WaitHandle_Remove(session->mgr, session->wv.hEvent); session->stream = NULL; - QueueUserAPC((PAPCFUNC)ILibDuktape_HECI_Session_CloseSink2, ILibProcessPipe_Manager_GetWorkerThread(session->mgr), (ULONG_PTR)session->descriptor); + CloseHandle(session->descriptor); + //QueueUserAPC((PAPCFUNC)ILibDuktape_HECI_Session_CloseSink2, ILibProcessPipe_Manager_GetWorkerThread(session->mgr), (ULONG_PTR)session->descriptor); } #else int d = Duktape_GetIntPropertyValue(ctx, -1, ILibDuktape_HECI_Descriptor, -1); @@ -952,7 +958,8 @@ duk_ret_t ILibDuktape_HECI_doIoctl(duk_context *ctx) duk_get_prop_string(ctx, -2, ILibDuktape_HECI_ChildProcess); // [heci][stash][childProcess] duk_get_prop_string(ctx, -1, ILibDuktape_ChildProcess_Manager); // [heci][stash][childProcess][manager] data->pipeManager = (ILibProcessPipe_Manager)duk_get_pointer(ctx, -1); - QueueUserAPC((PAPCFUNC)ILibDuktape_HECI_apc_AddIoctl, ILibProcessPipe_Manager_GetWorkerThread(data->pipeManager), (ULONG_PTR)data); + ILibDuktape_HECI_apc_AddIoctl((ULONG_PTR)data); + //QueueUserAPC((PAPCFUNC)ILibDuktape_HECI_apc_AddIoctl, ILibProcessPipe_Manager_GetWorkerThread(data->pipeManager), (ULONG_PTR)data); #elif defined(_POSIX) ILibDuktape_HECI_AddIoctl(data); #endif diff --git a/microstack/ILibProcessPipe.c b/microstack/ILibProcessPipe.c index 8a53624..ca1e06f 100644 --- a/microstack/ILibProcessPipe.c +++ b/microstack/ILibProcessPipe.c @@ -47,14 +47,6 @@ typedef struct ILibProcessPipe_Manager_Object { ILibChain_Link ChainLink; ILibLinkedList ActivePipes; - -#ifdef WIN32 - int abort; - HANDLE updateEvent; - HANDLE workerThread; - DWORD workerThreadID; - void *activeWaitHandle; -#endif }ILibProcessPipe_Manager_Object; struct ILibProcessPipe_PipeObject; @@ -84,7 +76,7 @@ typedef struct ILibProcessPipe_PipeObject HANDLE mPipe_ReadEnd; HANDLE mPipe_WriteEnd; OVERLAPPED *mOverlapped,*mwOverlapped; - void *mOverlapped_opaqueData, *user3, *user4; + void *user3, *user4; #else int mPipe_ReadEnd, mPipe_WriteEnd; #endif @@ -158,11 +150,12 @@ BOOL ILibProcessPipe_Process_OnExit(HANDLE event, ILibWaitHandle_ErrorStatus err typedef struct ILibProcessPipe_WaitHandle { ILibProcessPipe_Manager_Object *parent; - HANDLE event; + HANDLE event, registeredHandle; void *user; ILibProcessPipe_WaitHandle_Handler callback; int timeRemaining; int timeout; + int contextSwitch; }ILibProcessPipe_WaitHandle; typedef struct ILibProcessPipe_WaitHandle_APC { @@ -172,30 +165,34 @@ typedef struct ILibProcessPipe_WaitHandle_APC ILibProcessPipe_WaitHandle_Handler callback; void *user; }ILibProcessPipe_WaitHandle_APC; -HANDLE ILibProcessPipe_Manager_GetWorkerThread(ILibProcessPipe_Manager mgr) -{ - return(((ILibProcessPipe_Manager_Object*)mgr)->workerThread); -} -int ILibProcessPipe_Manager_WindowsWaitHandles_Remove_Comparer(void *source, void *matchWith) +void __stdcall ILibProcessPipe_WaitHandle_SignaledOrTimeout(void *user, BOOLEAN TimerOrWaitFired); // Prototype +int ILibProcessPipe_Manager_WindowsWaitHandles_Remove_event_Comparer(void *source, void *matchWith) { if (source == NULL) { return 1; } return(((ILibProcessPipe_WaitHandle*)source)->event == matchWith ? 0 : 1); } - +int ILibProcessPipe_Manager_WindowsWaitHandles_Remove_registeredHandle_Comparer(void *source, void *matchWith) +{ + if (source == NULL) { return 1; } + return(((ILibProcessPipe_WaitHandle*)source)->registeredHandle == matchWith ? 0 : 1); +} void __stdcall ILibProcessPipe_WaitHandle_Remove_APC(ULONG_PTR obj) { ILibProcessPipe_Manager_Object *manager = (ILibProcessPipe_Manager_Object*)((void**)obj)[0]; HANDLE event = (HANDLE)((void**)obj)[1]; - void *node = ILibLinkedList_GetNode_Search(manager->ActivePipes, ILibProcessPipe_Manager_WindowsWaitHandles_Remove_Comparer, event); + void *node = ILibLinkedList_GetNode_Search(manager->ActivePipes, ILibProcessPipe_Manager_WindowsWaitHandles_Remove_event_Comparer, event); ILibProcessPipe_WaitHandle *waiter; if (node != NULL) { waiter = (ILibProcessPipe_WaitHandle*)ILibLinkedList_GetDataFromNode(node); - free(waiter); - ILibLinkedList_Remove(node); - SetEvent(manager->updateEvent); + if (waiter->registeredHandle != NULL) + { + UnregisterWait(waiter->registeredHandle); waiter->registeredHandle = NULL; + } + ILibMemory_Free(waiter); + ILibLinkedList_Remove(node); } free((void*)obj); } @@ -206,35 +203,57 @@ void ILibProcessPipe_WaitHandle_Remove(ILibProcessPipe_Manager mgr, HANDLE event data[0] = manager; data[1] = event; - QueueUserAPC((PAPCFUNC)ILibProcessPipe_WaitHandle_Remove_APC, manager->workerThread, (ULONG_PTR)data); + QueueUserAPC((PAPCFUNC)ILibProcessPipe_WaitHandle_Remove_APC, ILibChain_GetMicrostackThreadHandle(manager->ChainLink.ParentChain), (ULONG_PTR)data); } -void __stdcall ILibProcessPipe_WaitHandle_Add_APC(ULONG_PTR obj) +void __stdcall ILibProcessPipe_WaitHandle_unregister(ULONG_PTR u) { - ILibProcessPipe_WaitHandle *waitHandle = (ILibProcessPipe_WaitHandle*)obj; - if (waitHandle->timeout > 0) { waitHandle->timeRemaining = waitHandle->timeout; } - ILibLinkedList_AddTail(waitHandle->parent->ActivePipes, waitHandle); -} -void ILibProcessPipe_WaitHandle_AddEx(ILibProcessPipe_Manager mgr, ILibProcessPipe_WaitHandle *waitHandle) -{ - ILibProcessPipe_Manager_Object *manager = (ILibProcessPipe_Manager_Object*)mgr; - if (manager->workerThreadID == GetCurrentThreadId()) + ILibProcessPipe_WaitHandle *waitHandle = (ILibProcessPipe_WaitHandle*)u; + if (!ILibMemory_CanaryOK((void*)u) || waitHandle->registeredHandle == NULL) { return; } + void *node = ILibLinkedList_GetNode_Search(waitHandle->parent->ActivePipes, ILibProcessPipe_Manager_WindowsWaitHandles_Remove_registeredHandle_Comparer, waitHandle->registeredHandle); + + if (node != NULL) { - // We're on the same thread, so we can just add it in - if (waitHandle->timeout > 0) { waitHandle->timeRemaining = waitHandle->timeout; } - ILibLinkedList_AddTail(manager->ActivePipes, waitHandle); - SetEvent(manager->updateEvent); + ILibLinkedList_Remove(node); + } + + UnregisterWait(waitHandle->registeredHandle); + waitHandle->registeredHandle = NULL; + ILibMemory_Free(waitHandle); +} +void __stdcall ILibProcessPipe_WaitHandle_reregister(ULONG_PTR u) +{ + if (!ILibMemory_CanaryOK((void*)u)) { return; } + ILibProcessPipe_WaitHandle *waitHandle = (ILibProcessPipe_WaitHandle*)u; + + if (waitHandle->registeredHandle != NULL) + { + UnregisterWait(waitHandle->registeredHandle); + waitHandle->registeredHandle = NULL; + } + RegisterWaitForSingleObject(&(waitHandle->registeredHandle), waitHandle->event, ILibProcessPipe_WaitHandle_SignaledOrTimeout, waitHandle, (ULONG)waitHandle->timeout, WT_EXECUTEINPERSISTENTTHREAD | WT_EXECUTEONLYONCE); +} +void __stdcall ILibProcessPipe_WaitHandle_SignaledOrTimeout(void *user, BOOLEAN TimerOrWaitFired) +{ + if (!ILibMemory_CanaryOK(user)) { return; } + ILibProcessPipe_WaitHandle *waitHandle = (ILibProcessPipe_WaitHandle*)user; + HANDLE chainHandle = ILibChain_GetMicrostackThreadHandle(waitHandle->parent->ChainLink.ParentChain); + + if (waitHandle->callback == NULL || waitHandle->callback(waitHandle->event, TimerOrWaitFired? ILibWaitHandle_ErrorStatus_TIMEOUT:ILibWaitHandle_ErrorStatus_NONE, waitHandle->user) == FALSE) + { + // Unregister + QueueUserAPC((PAPCFUNC)ILibProcessPipe_WaitHandle_unregister, chainHandle, (ULONG_PTR)waitHandle); } else { - // We're on a different thread, so we need to dispatch to the worker thread - QueueUserAPC((PAPCFUNC)ILibProcessPipe_WaitHandle_Add_APC, manager->workerThread, (ULONG_PTR)waitHandle); + // Re-Register + QueueUserAPC((PAPCFUNC)ILibProcessPipe_WaitHandle_reregister, chainHandle, (ULONG_PTR)waitHandle); } } void ILibProcessPipe_WaitHandle_Add_WithNonZeroTimeout(ILibProcessPipe_Manager mgr, HANDLE event, int milliseconds, void *user, ILibProcessPipe_WaitHandle_Handler callback) { ILibProcessPipe_Manager_Object *manager = (ILibProcessPipe_Manager_Object*)mgr; ILibProcessPipe_WaitHandle *waitHandle; - waitHandle = (ILibProcessPipe_WaitHandle*)ILibMemory_Allocate(sizeof(ILibProcessPipe_WaitHandle), 0, NULL, NULL); + waitHandle = (ILibProcessPipe_WaitHandle*)ILibMemory_SmartAllocate(sizeof(ILibProcessPipe_WaitHandle)); waitHandle->parent = manager; waitHandle->event = event; @@ -242,234 +261,39 @@ void ILibProcessPipe_WaitHandle_Add_WithNonZeroTimeout(ILibProcessPipe_Manager m waitHandle->callback = callback; waitHandle->timeout = milliseconds; - ILibProcessPipe_WaitHandle_AddEx(mgr, waitHandle); -} - -void __stdcall ILibProcessPipe_WaitHandle_Add2_apcsink(ULONG_PTR obj) -{ - if (ILibMemory_CanaryOK((void*)obj)) + if (RegisterWaitForSingleObject(&(waitHandle->registeredHandle), waitHandle->event, ILibProcessPipe_WaitHandle_SignaledOrTimeout, waitHandle, (ULONG)milliseconds, WT_EXECUTEINPERSISTENTTHREAD | WT_EXECUTEONLYONCE) == 0) { - ILibProcessPipe_WaitHandle_APC *apcState = (ILibProcessPipe_WaitHandle_APC*)obj; - if (apcState->callback != NULL) { apcState->callback(apcState->ev, apcState->status, apcState->user); } - ILibMemory_Free(apcState); + // FAILED + ILibMemory_Free(waitHandle); } -} -BOOL ILibProcessPipe_WaitHandle_Add2_sink(HANDLE event, ILibWaitHandle_ErrorStatus status, void* user) -{ - if (ILibMemory_CanaryOK(user)) + else { - QueueUserAPC((PAPCFUNC)ILibProcessPipe_WaitHandle_Add2_apcsink, ((ILibProcessPipe_WaitHandle_APC*)user)->callingThread, (ULONG_PTR)user); + ILibLinkedList_AddTail(manager->ActivePipes, waitHandle); } - return(FALSE); } void ILibProcessPipe_WaitHandle_Add2_WithNonZeroTimeout(ILibProcessPipe_Manager mgr, HANDLE event, int milliseconds, void *user, ILibProcessPipe_WaitHandle_Handler callback) { - ILibProcessPipe_WaitHandle_APC *apcState = ILibMemory_SmartAllocate(sizeof(ILibProcessPipe_WaitHandle_APC)); - DuplicateHandle(GetCurrentProcess(), GetCurrentThread(), GetCurrentProcess(), &apcState->callingThread, THREAD_SET_CONTEXT, FALSE, 0); - apcState->callback = callback; - apcState->user = user; - ILibProcessPipe_WaitHandle_Add_WithNonZeroTimeout(mgr, event, milliseconds, apcState, ILibProcessPipe_WaitHandle_Add2_sink); -} + ILibProcessPipe_Manager_Object *manager = (ILibProcessPipe_Manager_Object*)mgr; + ILibProcessPipe_WaitHandle *waitHandle; + waitHandle = (ILibProcessPipe_WaitHandle*)ILibMemory_SmartAllocate(sizeof(ILibProcessPipe_WaitHandle)); -void ILibProcessPipe_Manager_WindowsRunLoopEx(void *arg) -{ - ILibProcessPipe_Manager_Object *manager = (ILibProcessPipe_Manager_Object*)arg; - HANDLE hList[FD_SETSIZE * (2*sizeof(HANDLE))]; - ILibLinkedList active = manager->ActivePipes; - uint64_t timestamp1; - DWORD elapsedTime = 0; - void* node; - ILibProcessPipe_WaitHandle* data; + waitHandle->parent = manager; + waitHandle->event = event; + waitHandle->user = user; + waitHandle->callback = callback; + waitHandle->timeout = milliseconds; + waitHandle->contextSwitch = 1; - int i, jx; - DWORD x; - DWORD maxTimeout = INFINITE; - memset(hList, 0, sizeof(HANDLE)*FD_SETSIZE); - manager->workerThreadID = GetCurrentThreadId(); - - while (manager->abort == 0) + if (RegisterWaitForSingleObject(&(waitHandle->registeredHandle), waitHandle->event, ILibProcessPipe_WaitHandle_SignaledOrTimeout, waitHandle, (ULONG)milliseconds, WT_EXECUTEINPERSISTENTTHREAD | WT_EXECUTEONLYONCE) == 0) { - hList[0] = manager->updateEvent; - i = 1; - - //Prepare the rest of the WaitHandle Array, for the WaitForMultipleObject call - maxTimeout = INFINITE; - node = ILibLinkedList_GetNode_Head(active); - while (node != NULL) - { - if ((data = (ILibProcessPipe_WaitHandle*)ILibLinkedList_GetDataFromNode(node)) != NULL) - { - ILibRemoteLogging_printf(ILibChainGetLogger(manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "WindowsRunLoopEx(): [%p]", data); - - hList[i] = data->event; - hList[i + FD_SETSIZE] = (HANDLE)data; - if (data->timeout > 0) - { - if (elapsedTime > 0) - { - // We popped out of the wait, but didn't decrement timeRemaining yet - if (data->timeRemaining < (int)elapsedTime) - { - data->timeRemaining = 0; - } - else - { - data->timeRemaining -= (int)elapsedTime; - } - } - if ((DWORD)data->timeRemaining < maxTimeout) { maxTimeout = (DWORD)data->timeRemaining; } - } - ++i; - } - node = ILibLinkedList_GetNextNode(node); - } - - ILibRemoteLogging_printf(ILibChainGetLogger(manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "WindowsRunLoopEx(): Number of Handles: %d", i); - timestamp1 = (uint64_t)ILibGetUptime(); - - while ((x = WaitForMultipleObjectsEx(i, hList, FALSE, maxTimeout, TRUE)) != WAIT_IO_COMPLETION) - { - elapsedTime = (DWORD)((uint64_t)ILibGetUptime() - timestamp1); - if (x == WAIT_FAILED || x == WAIT_TIMEOUT || (x-WAIT_OBJECT_0) == 0) { break; } - data = (ILibProcessPipe_WaitHandle*)hList[x + FD_SETSIZE]; - manager->activeWaitHandle = data; - if (data != NULL && data->callback != NULL) - { - if (data->callback(data->event, ILibWaitHandle_ErrorStatus_NONE, data->user) == FALSE) - { - // FALSE means to remove the WaitHandle - void *node = ILibLinkedList_GetNode_Search(manager->ActivePipes, ILibProcessPipe_Manager_WindowsWaitHandles_Remove_Comparer, data->event); - if (node != NULL) - { - free(ILibLinkedList_GetDataFromNode(node)); - ILibLinkedList_Remove(node); - break; - } - } - } - - if (maxTimeout != INFINITE) - { - // Need to recalculate maxTimeout - for (jx = 1; jx < i; ++jx) - { - data = (ILibProcessPipe_WaitHandle*)hList[jx + FD_SETSIZE]; - if (data->timeout > 0) // Timeout was specified - { - if (data->timeRemaining > (int)elapsedTime) // We can just decrement time left - { - data->timeRemaining -= (int)elapsedTime; - } - else - { - data->timeRemaining = 0; // Already expired - } - if (data->timeRemaining < (int)maxTimeout) - { - maxTimeout = (DWORD)data->timeRemaining; // Set the new maxTimeout - } - } - } - } - } - if (x == WAIT_IO_COMPLETION) { elapsedTime = (DWORD)((uint64_t)ILibGetUptime() - timestamp1); } - if (x == WAIT_FAILED) - { - for (jx = 1; jx < i; ++jx) - { - if (WaitForSingleObject(hList[jx], 0) == WAIT_FAILED) - { - // This handle is invalid, so let's error it out and remove it - node = ILibLinkedList_GetNode_Search(manager->ActivePipes, ILibProcessPipe_Manager_WindowsWaitHandles_Remove_Comparer, hList[jx]); - if (node != NULL) - { - // Propagate an Error up - data = (ILibProcessPipe_WaitHandle*)hList[jx + FD_SETSIZE]; - if (data != NULL && data->callback != NULL) - { - manager->activeWaitHandle = data; - data->callback(data->event, ILibWaitHandle_ErrorStatus_INVALID_HANDLE, data->user); - } - - // Remove the wait handle - free(ILibLinkedList_GetDataFromNode(node)); - ILibLinkedList_Remove(node); - break; - } - } - } - } - if (x == WAIT_TIMEOUT) - { - for (jx = 1; jx < i; ++jx) - { - data = (ILibProcessPipe_WaitHandle*)hList[jx + FD_SETSIZE]; - if (data->timeout > 0) - { - // A timeout was specified, so lets check it - if (data->timeRemaining <= (int)elapsedTime) - { - data->timeRemaining = 0; - } - else - { - data->timeRemaining -= (int)elapsedTime; - } - if (data->timeRemaining == 0) - { - manager->activeWaitHandle = data; - if (data->callback == NULL || data->callback(data->event, ILibWaitHandle_ErrorStatus_TIMEOUT, data->user) == FALSE) - { - // Remove Wait Handle - node = ILibLinkedList_GetNode_Search(manager->ActivePipes, ILibProcessPipe_Manager_WindowsWaitHandles_Remove_Comparer, data->event); - if (node != NULL) - { - free(ILibLinkedList_GetDataFromNode(node)); - ILibLinkedList_Remove(node); - } - } - else - { - // Reset timeout - data->timeRemaining = data->timeout; - } - } - } - } - elapsedTime = 0; // Set this to zero, becuase we already decremented everyone's time remaining - } - ResetEvent(manager->updateEvent); + // FAILED + ILibMemory_Free(waitHandle); } - - // - // We need to enumerate the list of handles, and free them, because now that this thread - // is not in an alertable state, the parent will not be able to Queue any actions for cleanup - // - while ((node = ILibLinkedList_GetNode_Head(manager->ActivePipes)) != NULL) + else { - free(ILibLinkedList_GetDataFromNode(node)); - ILibLinkedList_Remove(node); + ILibLinkedList_AddTail(manager->ActivePipes, waitHandle); } } -ILibProcessPipe_Manager_WindowsRunLoop(void *arg) -{ - ILib_DumpEnabledContext winException; - __try - { - ILibProcessPipe_Manager_WindowsRunLoopEx(arg); - } - __except (ILib_WindowsExceptionFilterEx(GetExceptionCode(), GetExceptionInformation(), &winException)) - { - ILib_WindowsExceptionDebugEx(&winException); - } -} -void ILibProcessPipe_Manager_Start(void* chain, void* user) -{ - ILibProcessPipe_Manager_Object* man = (ILibProcessPipe_Manager_Object*)user; - - UNREFERENCED_PARAMETER(chain); - man->workerThread = ILibSpawnNormalThread((voidfp1)&ILibProcessPipe_Manager_WindowsRunLoop, user); -} #else void ILibProcessPipe_Process_ReadHandler(void* user); void ILibProcessPipe_Manager_OnPreSelect(void* object, fd_set *readset, fd_set *writeset, fd_set *errorset, int* blocktime) @@ -522,9 +346,7 @@ void ILibProcessPipe_Manager_OnDestroy(void *object) ILibProcessPipe_Manager_Object *man = (ILibProcessPipe_Manager_Object*)object; #ifdef WIN32 - man->abort = 1; - SetEvent(man->updateEvent); - WaitForSingleObject(man->workerThread, INFINITE); + // ToDo: Enumerate List, and unregister everything #endif ILibLinkedList_Destroy(man->ActivePipes); } @@ -538,10 +360,7 @@ ILibProcessPipe_Manager ILibProcessPipe_Manager_Create(void *chain) retVal->ChainLink.ParentChain = chain; retVal->ActivePipes = ILibLinkedList_CreateEx(sizeof(int)); -#ifdef WIN32 - retVal->updateEvent = CreateEvent(NULL, TRUE, FALSE, NULL); - ILibChain_RunOnMicrostackThread(chain, ILibProcessPipe_Manager_Start, retVal); -#else +#ifndef WIN32 retVal->ChainLink.PreSelectHandler = &ILibProcessPipe_Manager_OnPreSelect; retVal->ChainLink.PostSelectHandler = &ILibProcessPipe_Manager_OnPostSelect; #endif @@ -716,13 +535,15 @@ void ILibProcessPipe_Process_Destroy(ILibProcessPipe_Process_Object *p) if (!ILibMemory_CanaryOK(p)) { return; } #ifdef WIN32 - ILibProcessPipe_Process_Destroy_WinRunThread_Data *data = ILibMemory_AllocateA(sizeof(ILibProcessPipe_Process_Destroy_WinRunThread_Data)); - data->pj = p; - data->h = CreateEvent(NULL, TRUE, FALSE, NULL); - // We can't destroy this now, because we're on the MicrostackThread. We must destroy this on the WindowsRunLoop Thread. - QueueUserAPC((PAPCFUNC)ILibProcessPipe_Process_Destroy_WinRunThread, p->parent->workerThread, (ULONG_PTR)data); - WaitForSingleObjectEx(data->h, 3000, TRUE); - CloseHandle(data->h); + //ToDo: Do Something here + + //ILibProcessPipe_Process_Destroy_WinRunThread_Data *data = ILibMemory_AllocateA(sizeof(ILibProcessPipe_Process_Destroy_WinRunThread_Data)); + //data->pj = p; + //data->h = CreateEvent(NULL, TRUE, FALSE, NULL); + //// We can't destroy this now, because we're on the MicrostackThread. We must destroy this on the WindowsRunLoop Thread. + //QueueUserAPC((PAPCFUNC)ILibProcessPipe_Process_Destroy_WinRunThread, p->parent->workerThread, (ULONG_PTR)data); + //WaitForSingleObjectEx(data->h, 3000, TRUE); + //CloseHandle(data->h); #else if (p->exiting != 0) { return; } if (p->stdIn != NULL) { ILibProcessPipe_FreePipe(p->stdIn); } @@ -1270,52 +1091,8 @@ void ILibProcessPipe_Pipe_Pause(ILibProcessPipe_Pipe pipeObject) } else { - ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.Pause(): Opaque = %p",(void*)p->mOverlapped_opaqueData); - if (p->manager->workerThreadID == GetCurrentThreadId()) - { - // Already on the dispatch thread, so we can directly update the list - if (p->mOverlapped_opaqueData == NULL) - { - void *node = ILibLinkedList_GetNode_Search(p->manager->ActivePipes, NULL, p->manager->activeWaitHandle); - if (node != NULL) - { - ILibLinkedList_Remove(node); - p->mOverlapped_opaqueData = p->manager->activeWaitHandle; - ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "...Opaque = %p", (void*)p->mOverlapped_opaqueData); - SetEvent(p->manager->updateEvent); // Just need to set event, so the wait list will be rebuilt - } - else - { - ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.Pause(): Active pipe not in list"); - } - } - else - { - ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.Pause(): Already paused"); - } - } - else - { - // Not on the dispatch thread, so we'll have to dispatch - p->mOverlapped_opaqueData = NULL; - ILibProcessPipe_WaitHandle_Remove(p->manager, p->mOverlapped->hEvent); - } - - //if (p->mOverlapped_opaqueData == NULL) - //{ - // void *node = ILibLinkedList_GetNode_Search(p->manager->ActivePipes, NULL, p->manager->activeWaitHandle); - // if (node != NULL) - // { - // ILibLinkedList_Remove(node); - // p->mOverlapped_opaqueData = p->manager->activeWaitHandle; - // ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "...Opaque = %p", (void*)p->mOverlapped_opaqueData); - // SetEvent(p->manager->updateEvent); - // } - // else - // { - // ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.Pause(): Active pipe not in list"); - // } - //} + ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.Pause()"); + ILibProcessPipe_WaitHandle_Remove(p->manager, p->mOverlapped->hEvent); } #else ILibLinkedList_Remove(ILibLinkedList_GetNode_Search(p->manager->ActivePipes, NULL, pipeObject)); @@ -1368,59 +1145,20 @@ void ILibProcessPipe_Pipe_ResumeEx_ContinueProcessing(ILibProcessPipe_PipeObject } p->processingLoop = 0; } -#ifdef WIN32 -void __stdcall ILibProcessPipe_Pipe_ResumeEx_APC(ULONG_PTR obj) -{ - ILibProcessPipe_PipeObject *p = (ILibProcessPipe_PipeObject*)obj; - - ILibProcessPipe_Pipe_ResumeEx_ContinueProcessing(p); - if (p->PAUSED == 0) - { - ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.ResumeEx_APC(): Adding Pipe [%p]", (void*)p); - if (p->mOverlapped_opaqueData != NULL) - { - ILibProcessPipe_WaitHandle_AddEx(p->manager, (ILibProcessPipe_WaitHandle*)p->mOverlapped_opaqueData); - p->mOverlapped_opaqueData = NULL; - } - else - { - ILibProcessPipe_WaitHandle_Add(p->manager, p->mOverlapped->hEvent, p, ILibProcessPipe_Process_ReadHandler); - } - //printf("ReadFile(%p, %d, %d) (ResumeEx_APC)\n", p->mPipe_ReadEnd, p->readOffset + p->totalRead, p->bufferSize - p->readOffset - p->totalRead); - ReadFile(p->mPipe_ReadEnd, p->buffer + p->readOffset + p->totalRead, p->bufferSize - p->readOffset - p->totalRead, NULL, p->mOverlapped); - } - else - { - ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.ResumeEx_APC(): Skipping Pipe [%p]", (void*)p); - } -} -#endif void ILibProcessPipe_Pipe_ResumeEx(ILibProcessPipe_PipeObject* p) { if (!ILibMemory_CanaryOK(p)) { return; } ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.ResumeEx(): processingLoop = %d", p->processingLoop); #ifdef WIN32 - if (p->manager->workerThreadID == GetCurrentThreadId()) + if (!ILibIsRunningOnChainThread(p->manager->ChainLink.ParentChain)) { - // We were called from the event dispatch (ie: we're on the same thread) - if (p->mOverlapped_opaqueData != NULL) - { - ILibProcessPipe_WaitHandle_AddEx(p->manager, (ILibProcessPipe_WaitHandle*)p->mOverlapped_opaqueData); - p->mOverlapped_opaqueData = NULL; - } - else - { - ILibProcessPipe_WaitHandle_Add(p->manager, p->mOverlapped->hEvent, p, ILibProcessPipe_Process_ReadHandler); - } - p->PAUSED = 0; - } - else - { - // We're on a different thread, so we need to dispatch to the worker thread - QueueUserAPC((PAPCFUNC)ILibProcessPipe_Pipe_ResumeEx_APC, p->manager->workerThread, (ULONG_PTR)p); + QueueUserAPC((PAPCFUNC)NULL, ILibChain_GetMicrostackThreadHandle(p->manager->ChainLink.ParentChain), (ULONG_PTR)p); + return; } + ILibProcessPipe_WaitHandle_Add(p->manager, p->mOverlapped->hEvent, p, ILibProcessPipe_Process_ReadHandler); + p->PAUSED = 0; return; #endif @@ -1582,8 +1320,11 @@ void ILibProcessPipe_Process_OnExit_ChainSink(void *chain, void *user) j->exitHandler(j, exitCode, j->userObject); j->exiting ^= 1; + + if (j->exiting == 0) { ILibProcessPipe_Process_Destroy(j); } + // We can't destroy this now, because we're on the MicrostackThread. We must destroy this on the WindowsRunLoop Thread. - QueueUserAPC((PAPCFUNC)ILibProcessPipe_Process_OnExit_ChainSink_DestroySink, j->parent->workerThread, (ULONG_PTR)j); + //QueueUserAPC((PAPCFUNC)ILibProcessPipe_Process_OnExit_ChainSink_DestroySink, j->parent->workerThread, (ULONG_PTR)j); } #ifdef WIN32 void __stdcall ILibProcessPipe_Process_OnExit_ChainSink_APC(ULONG_PTR obj)