1
0
mirror of https://github.com/Ylianst/MeshAgent synced 2025-12-15 15:53:55 +00:00

Update to use new threading model for windows

This commit is contained in:
Bryan Roe
2020-05-07 10:29:49 -07:00
parent 47066bd825
commit 9668b8d77d
5 changed files with 226 additions and 680 deletions

View File

@@ -87,7 +87,6 @@ typedef struct ILibProcessPipe_PipeObject
ILibProcessPipe_GenericBrokenPipeHandler brokenPipeHandler;
void *user1, *user2;
#ifdef WIN32
int usingCompletionRoutine;
int cancelInProgress;
HANDLE mPipe_Reader_ResumeEvent;
HANDLE mPipe_ReadEnd;
@@ -166,298 +165,7 @@ ILibProcessPipe_Pipe ILibProcessPipe_Process_GetStdOut(ILibProcessPipe_Process p
}
#ifdef WIN32
BOOL ILibProcessPipe_Process_OnExit(HANDLE event, ILibWaitHandle_ErrorStatus errors, void* user);
typedef struct ILibProcessPipe_WaitHandle
{
ILibProcessPipe_Manager_Object *parent;
HANDLE event;
void *user;
ILibProcessPipe_WaitHandle_Handler callback;
int timeRemaining;
int timeout;
}ILibProcessPipe_WaitHandle;
typedef struct ILibProcessPipe_WaitHandle_APC
{
HANDLE callingThread;
HANDLE ev;
ILibWaitHandle_ErrorStatus status;
ILibProcessPipe_WaitHandle_Handler callback;
void *chain;
void *user;
}ILibProcessPipe_WaitHandle_APC;
HANDLE ILibProcessPipe_Manager_GetWorkerThread(ILibProcessPipe_Manager mgr)
{
return(((ILibProcessPipe_Manager_Object*)mgr)->workerThread);
}
int ILibProcessPipe_Manager_WindowsWaitHandles_Remove_Comparer(void *source, void *matchWith)
{
if (source == NULL) { return 1; }
return(((ILibProcessPipe_WaitHandle*)source)->event == matchWith ? 0 : 1);
}
void __stdcall ILibProcessPipe_WaitHandle_Remove_APC(ULONG_PTR obj)
{
ILibProcessPipe_Manager_Object *manager = (ILibProcessPipe_Manager_Object*)((void**)obj)[0];
HANDLE event = (HANDLE)((void**)obj)[1];
void *node = ILibLinkedList_GetNode_Search(manager->ActivePipes, ILibProcessPipe_Manager_WindowsWaitHandles_Remove_Comparer, event);
ILibProcessPipe_WaitHandle *waiter;
if (node != NULL)
{
waiter = (ILibProcessPipe_WaitHandle*)ILibLinkedList_GetDataFromNode(node);
if (waiter->callback != NULL) { waiter->callback(waiter->event, ILibWaitHandle_ErrorStatus_REMOVED, waiter->user); }
free(waiter);
ILibLinkedList_Remove(node);
SetEvent(manager->updateEvent);
}
free((void*)obj);
}
void ILibProcessPipe_WaitHandle_Remove(ILibProcessPipe_Manager mgr, HANDLE event)
{
ILibProcessPipe_Manager_Object *manager = (ILibProcessPipe_Manager_Object*)mgr;
void **data = (void**)ILibMemory_Allocate(2 * sizeof(void*), 0, NULL, NULL);
data[0] = manager;
data[1] = event;
QueueUserAPC((PAPCFUNC)ILibProcessPipe_WaitHandle_Remove_APC, manager->workerThread, (ULONG_PTR)data);
}
void __stdcall ILibProcessPipe_WaitHandle_Add_APC(ULONG_PTR obj)
{
ILibProcessPipe_WaitHandle *waitHandle = (ILibProcessPipe_WaitHandle*)obj;
if (waitHandle->timeout > 0) { waitHandle->timeRemaining = waitHandle->timeout; }
ILibLinkedList_AddTail(waitHandle->parent->ActivePipes, waitHandle);
}
void ILibProcessPipe_WaitHandle_AddEx(ILibProcessPipe_Manager mgr, ILibProcessPipe_WaitHandle *waitHandle)
{
ILibProcessPipe_Manager_Object *manager = (ILibProcessPipe_Manager_Object*)mgr;
if (manager->workerThreadID == GetCurrentThreadId())
{
// We're on the same thread, so we can just add it in
if (waitHandle->timeout > 0) { waitHandle->timeRemaining = waitHandle->timeout; }
ILibLinkedList_AddTail(manager->ActivePipes, waitHandle);
SetEvent(manager->updateEvent);
}
else
{
// We're on a different thread, so we need to dispatch to the worker thread
QueueUserAPC((PAPCFUNC)ILibProcessPipe_WaitHandle_Add_APC, manager->workerThread, (ULONG_PTR)waitHandle);
}
}
void ILibProcessPipe_WaitHandle_Add_WithNonZeroTimeout(ILibProcessPipe_Manager mgr, HANDLE event, int milliseconds, void *user, ILibProcessPipe_WaitHandle_Handler callback)
{
ILibProcessPipe_Manager_Object *manager = (ILibProcessPipe_Manager_Object*)mgr;
ILibProcessPipe_WaitHandle *waitHandle;
waitHandle = (ILibProcessPipe_WaitHandle*)ILibMemory_Allocate(sizeof(ILibProcessPipe_WaitHandle), 0, NULL, NULL);
waitHandle->parent = manager;
waitHandle->event = event;
waitHandle->user = user;
waitHandle->callback = callback;
waitHandle->timeout = milliseconds;
ILibProcessPipe_WaitHandle_AddEx(mgr, waitHandle);
}
void ILibProcessPipe_Manager_WindowsRunLoopEx(void *arg)
{
ILibProcessPipe_Manager_Object *manager = (ILibProcessPipe_Manager_Object*)arg;
HANDLE hList[FD_SETSIZE * (2*sizeof(HANDLE))];
ILibLinkedList active = manager->ActivePipes;
uint64_t timestamp1;
DWORD elapsedTime = 0;
void* node;
ILibProcessPipe_WaitHandle* data;
int i, jx;
DWORD x;
DWORD maxTimeout = INFINITE;
memset(hList, 0, sizeof(HANDLE)*FD_SETSIZE);
manager->workerThreadID = GetCurrentThreadId();
while (manager->abort == 0)
{
hList[0] = manager->updateEvent;
i = 1;
//Prepare the rest of the WaitHandle Array, for the WaitForMultipleObject call
maxTimeout = INFINITE;
node = ILibLinkedList_GetNode_Head(active);
while (node != NULL)
{
if ((data = (ILibProcessPipe_WaitHandle*)ILibLinkedList_GetDataFromNode(node)) != NULL)
{
ILibRemoteLogging_printf(ILibChainGetLogger(manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "WindowsRunLoopEx(): [%p]", data);
hList[i] = data->event;
hList[i + FD_SETSIZE] = (HANDLE)data;
if (data->timeout > 0)
{
if (elapsedTime > 0)
{
// We popped out of the wait, but didn't decrement timeRemaining yet
if (data->timeRemaining < (int)elapsedTime)
{
data->timeRemaining = 0;
}
else
{
data->timeRemaining -= (int)elapsedTime;
}
}
if ((DWORD)data->timeRemaining < maxTimeout) { maxTimeout = (DWORD)data->timeRemaining; }
}
++i;
}
node = ILibLinkedList_GetNextNode(node);
}
ILibRemoteLogging_printf(ILibChainGetLogger(manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "WindowsRunLoopEx(): Number of Handles: %d", i);
timestamp1 = (uint64_t)ILibGetUptime();
while ((x = WaitForMultipleObjectsEx(i, hList, FALSE, maxTimeout, TRUE)) != WAIT_IO_COMPLETION)
{
elapsedTime = (DWORD)((uint64_t)ILibGetUptime() - timestamp1);
if (x == WAIT_FAILED || x == WAIT_TIMEOUT || (x-WAIT_OBJECT_0) == 0) { break; }
data = (ILibProcessPipe_WaitHandle*)hList[x + FD_SETSIZE];
manager->activeWaitHandle = data;
if (data != NULL && data->callback != NULL)
{
if (data->callback(data->event, ILibWaitHandle_ErrorStatus_NONE, data->user) == FALSE)
{
// FALSE means to remove the WaitHandle
void *node = ILibLinkedList_GetNode_Search(manager->ActivePipes, ILibProcessPipe_Manager_WindowsWaitHandles_Remove_Comparer, data->event);
if (node != NULL)
{
free(ILibLinkedList_GetDataFromNode(node));
ILibLinkedList_Remove(node);
break;
}
}
}
if (maxTimeout != INFINITE)
{
// Need to recalculate maxTimeout
for (jx = 1; jx < i; ++jx)
{
data = (ILibProcessPipe_WaitHandle*)hList[jx + FD_SETSIZE];
if (data->timeout > 0) // Timeout was specified
{
if (data->timeRemaining > (int)elapsedTime) // We can just decrement time left
{
data->timeRemaining -= (int)elapsedTime;
}
else
{
data->timeRemaining = 0; // Already expired
}
if (data->timeRemaining < (int)maxTimeout)
{
maxTimeout = (DWORD)data->timeRemaining; // Set the new maxTimeout
}
}
}
}
}
if (x == WAIT_IO_COMPLETION) { elapsedTime = (DWORD)((uint64_t)ILibGetUptime() - timestamp1); }
if (x == WAIT_FAILED)
{
for (jx = 1; jx < i; ++jx)
{
if (WaitForSingleObject(hList[jx], 0) == WAIT_FAILED)
{
// This handle is invalid, so let's error it out and remove it
node = ILibLinkedList_GetNode_Search(manager->ActivePipes, ILibProcessPipe_Manager_WindowsWaitHandles_Remove_Comparer, hList[jx]);
if (node != NULL)
{
// Propagate an Error up
data = (ILibProcessPipe_WaitHandle*)hList[jx + FD_SETSIZE];
if (data != NULL && data->callback != NULL)
{
manager->activeWaitHandle = data;
data->callback(data->event, ILibWaitHandle_ErrorStatus_INVALID_HANDLE, data->user);
}
// Remove the wait handle
free(ILibLinkedList_GetDataFromNode(node));
ILibLinkedList_Remove(node);
break;
}
}
}
}
if (x == WAIT_TIMEOUT)
{
for (jx = 1; jx < i; ++jx)
{
data = (ILibProcessPipe_WaitHandle*)hList[jx + FD_SETSIZE];
if (data->timeout > 0)
{
// A timeout was specified, so lets check it
if (data->timeRemaining <= (int)elapsedTime)
{
data->timeRemaining = 0;
}
else
{
data->timeRemaining -= (int)elapsedTime;
}
if (data->timeRemaining == 0)
{
manager->activeWaitHandle = data;
if (data->callback == NULL || data->callback(data->event, ILibWaitHandle_ErrorStatus_TIMEOUT, data->user) == FALSE)
{
// Remove Wait Handle
node = ILibLinkedList_GetNode_Search(manager->ActivePipes, ILibProcessPipe_Manager_WindowsWaitHandles_Remove_Comparer, data->event);
if (node != NULL)
{
free(ILibLinkedList_GetDataFromNode(node));
ILibLinkedList_Remove(node);
}
}
else
{
// Reset timeout
data->timeRemaining = data->timeout;
}
}
}
}
elapsedTime = 0; // Set this to zero, becuase we already decremented everyone's time remaining
}
ResetEvent(manager->updateEvent);
}
//
// We need to enumerate the list of handles, and free them, because now that this thread
// is not in an alertable state, the parent will not be able to Queue any actions for cleanup
//
while ((node = ILibLinkedList_GetNode_Head(manager->ActivePipes)) != NULL)
{
free(ILibLinkedList_GetDataFromNode(node));
ILibLinkedList_Remove(node);
}
}
ILibProcessPipe_Manager_WindowsRunLoop(void *arg)
{
ILib_DumpEnabledContext winException;
__try
{
ILibProcessPipe_Manager_WindowsRunLoopEx(arg);
}
__except (ILib_WindowsExceptionFilterEx(GetExceptionCode(), GetExceptionInformation(), &winException))
{
ILib_WindowsExceptionDebugEx(&winException);
}
}
void ILibProcessPipe_Manager_Start(void* chain, void* user)
{
ILibProcessPipe_Manager_Object* man = (ILibProcessPipe_Manager_Object*)user;
UNREFERENCED_PARAMETER(chain);
man->workerThread = ILibSpawnNormalThread((voidfp1)&ILibProcessPipe_Manager_WindowsRunLoop, user);
}
BOOL ILibProcessPipe_Process_OnExit(void *chain, HANDLE event, ILibWaitHandle_ErrorStatus errors, void* user);
#else
void ILibProcessPipe_Process_ReadHandler(void* user);
void ILibProcessPipe_Manager_OnPreSelect(void* object, fd_set *readset, fd_set *writeset, fd_set *errorset, int* blocktime)
@@ -526,10 +234,7 @@ ILibProcessPipe_Manager ILibProcessPipe_Manager_Create(void *chain)
retVal->ChainLink.ParentChain = chain;
retVal->ActivePipes = ILibLinkedList_CreateEx(sizeof(int));
#ifdef WIN32
retVal->updateEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
ILibChain_RunOnMicrostackThread(chain, ILibProcessPipe_Manager_Start, retVal);
#else
#ifndef WIN32
retVal->ChainLink.PreSelectHandler = &ILibProcessPipe_Manager_OnPreSelect;
retVal->ChainLink.PostSelectHandler = &ILibProcessPipe_Manager_OnPostSelect;
#endif
@@ -558,10 +263,22 @@ void ILibProcessPipe_FreePipe(ILibProcessPipe_PipeObject *pipeObject)
{
CloseHandle(pipeObject->mPipe_WriteEnd);
}
if (pipeObject->mOverlapped != NULL && pipeObject->usingCompletionRoutine == 0) { CloseHandle(pipeObject->mOverlapped->hEvent); free(pipeObject->mOverlapped); }
if (pipeObject->mwOverlapped != NULL) { free(pipeObject->mwOverlapped); }
if (pipeObject->mOverlapped != NULL)
{
ILibChain_RemoveWaitHandle(pipeObject->manager->ChainLink.ParentChain, pipeObject->mOverlapped->hEvent);
CloseHandle(pipeObject->mOverlapped->hEvent); free(pipeObject->mOverlapped);
}
if (pipeObject->mwOverlapped != NULL)
{
if (pipeObject->mwOverlapped->hEvent != NULL)
{
ILibChain_RemoveWaitHandle(pipeObject->manager->ChainLink.ParentChain, pipeObject->mwOverlapped->hEvent);
CloseHandle(pipeObject->mwOverlapped->hEvent);
}
free(pipeObject->mwOverlapped);
}
if (pipeObject->mPipe_Reader_ResumeEvent != NULL) { CloseHandle(pipeObject->mPipe_Reader_ResumeEvent); }
if (pipeObject->buffer != NULL && pipeObject->usingCompletionRoutine == 0) { free(pipeObject->buffer); }
if (pipeObject->buffer != NULL) { free(pipeObject->buffer); }
#else
if (pipeObject->manager != NULL)
{
@@ -591,11 +308,7 @@ void ILibProcessPipe_FreePipe(ILibProcessPipe_PipeObject *pipeObject)
if (pipeObject->mProcess->stdOut == pipeObject) { pipeObject->mProcess->stdOut = NULL; }
if (pipeObject->mProcess->stdErr == pipeObject) { pipeObject->mProcess->stdErr = NULL; }
}
#ifdef WIN32
if (pipeObject->usingCompletionRoutine == 0) { ILibMemory_Free(pipeObject); }
#else
ILibMemory_Free(pipeObject);
#endif
}
#ifdef WIN32
@@ -686,48 +399,15 @@ ILibProcessPipe_PipeObject* ILibProcessPipe_CreatePipe(ILibProcessPipe_Manager m
return retVal;
}
#ifdef WIN32
typedef struct ILibProcessPipe_Process_Destroy_WinRunThread_Data
{
ILibProcessPipe_Process_Object *pj;
HANDLE h;
}ILibProcessPipe_Process_Destroy_WinRunThread_Data;
void __stdcall ILibProcessPipe_Process_Destroy_WinRunThread(ULONG_PTR obj)
{
ILibProcessPipe_Process_Destroy_WinRunThread_Data *data = (ILibProcessPipe_Process_Destroy_WinRunThread_Data*)obj;
if (ILibMemory_CanaryOK(data) && ILibMemory_CanaryOK(data->pj))
{
if (data->pj->exiting == 0)
{
if (ILibMemory_CanaryOK(data) && ILibMemory_CanaryOK(data->pj) && data->pj->stdIn != NULL) { ILibProcessPipe_FreePipe(data->pj->stdIn); }
if (ILibMemory_CanaryOK(data) && ILibMemory_CanaryOK(data->pj) && data->pj->stdOut != NULL) { ILibProcessPipe_FreePipe(data->pj->stdOut); }
if (ILibMemory_CanaryOK(data) && ILibMemory_CanaryOK(data->pj) && data->pj->stdErr != NULL) { ILibProcessPipe_FreePipe(data->pj->stdErr); }
if (ILibMemory_CanaryOK(data) && ILibMemory_CanaryOK(data->pj)) { ILibMemory_Free(data->pj); }
}
}
SetEvent(data->h);
}
#endif
void ILibProcessPipe_Process_Destroy(ILibProcessPipe_Process_Object *p)
{
if (!ILibMemory_CanaryOK(p)) { return; }
#ifdef WIN32
ILibProcessPipe_Process_Destroy_WinRunThread_Data *data = ILibMemory_AllocateA(sizeof(ILibProcessPipe_Process_Destroy_WinRunThread_Data));
data->pj = p;
data->h = CreateEvent(NULL, TRUE, FALSE, NULL);
// We can't destroy this now, because we're on the MicrostackThread. We must destroy this on the WindowsRunLoop Thread.
QueueUserAPC((PAPCFUNC)ILibProcessPipe_Process_Destroy_WinRunThread, p->parent->workerThread, (ULONG_PTR)data);
WaitForSingleObjectEx(data->h, 3000, TRUE);
CloseHandle(data->h);
#else
if (p->exiting != 0) { return; }
if (p->stdIn != NULL) { ILibProcessPipe_FreePipe(p->stdIn); }
if (p->stdOut != NULL) { ILibProcessPipe_FreePipe(p->stdOut); }
if (p->stdErr != NULL) { ILibProcessPipe_FreePipe(p->stdErr); }
ILibMemory_Free(p);
#endif
}
#ifndef WIN32
void ILibProcessPipe_Process_BrokenPipeSink_DestroyHandler(void *object)
@@ -1073,7 +753,7 @@ void ILibProcessPipe_Pipe_SwapBuffers(ILibProcessPipe_Pipe obj, char* newBuffer,
}
#ifdef WIN32
BOOL ILibProcessPipe_Process_ReadHandler(HANDLE event, ILibWaitHandle_ErrorStatus errors, void* user)
BOOL ILibProcessPipe_Process_ReadHandler(void *chain, HANDLE event, ILibWaitHandle_ErrorStatus errors, void* user)
#else
void ILibProcessPipe_Process_ReadHandler(void* user)
#endif
@@ -1198,10 +878,7 @@ void ILibProcessPipe_Process_ReadHandler(void* user)
// Broken Pipe
//
ILibRemoteLogging_printf(ILibChainGetLogger(pipeObject->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Pipe, ILibRemoteLogging_Flags_VerbosityLevel_1, "ILibProcessPipe[ReadHandler]: BrokenPipe(%d) on Pipe: %p", err, (void*)pipeObject);
#ifdef WIN32
ILibProcessPipe_WaitHandle_Remove(pipeObject->manager, pipeObject->mOverlapped->hEvent); // Pipe Broken, so remove ourselves from the processing loop
ILibLinkedList_Remove(ILibLinkedList_GetNode_Search(pipeObject->manager->ActivePipes, NULL, pipeObject));
#else
#ifndef WIN32
void *pipenode = ILibLinkedList_GetNode_Search(pipeObject->manager->ActivePipes, NULL, pipeObject);
if (pipenode != NULL)
{
@@ -1233,7 +910,7 @@ void ILibProcessPipe_Process_ReadHandler(void* user)
#endif
}
#ifdef WIN32
BOOL ILibProcessPipe_Process_WindowsWriteHandler(HANDLE event, ILibWaitHandle_ErrorStatus errors, void* user)
BOOL ILibProcessPipe_Process_WindowsWriteHandler(void *chain, HANDLE event, ILibWaitHandle_ErrorStatus errors, void* user)
{
if (errors != ILibWaitHandle_ErrorStatus_NONE) { return(FALSE); }
ILibProcessPipe_PipeObject *pipeObject = (ILibProcessPipe_PipeObject*)user;
@@ -1246,11 +923,11 @@ BOOL ILibProcessPipe_Process_WindowsWriteHandler(HANDLE event, ILibWaitHandle_Er
if (result == FALSE)
{
// Broken Pipe
ILibProcessPipe_WaitHandle_Remove(pipeObject->manager, pipeObject->mOverlapped->hEvent); // Pipe Broken, so remove ourselves from the processing loop
ILibChain_RemoveWaitHandle(pipeObject->manager->ChainLink.ParentChain, pipeObject->mOverlapped->hEvent);
ILibRemoteLogging_printf(ILibChainGetLogger(pipeObject->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Pipe, ILibRemoteLogging_Flags_VerbosityLevel_1, "ILibProcessPipe[WriteHandler]: BrokenPipe(%d) on Pipe: %p", GetLastError(), (void*)pipeObject);
if (pipeObject->brokenPipeHandler != NULL) { ((ILibProcessPipe_GenericBrokenPipeHandler)pipeObject->brokenPipeHandler)(pipeObject); }
ILibProcessPipe_FreePipe(pipeObject);
return(FALSE);
return(TRUE);
}
ILibQueue_Lock(pipeObject->WriteBuffer);
@@ -1267,16 +944,17 @@ BOOL ILibProcessPipe_Process_WindowsWriteHandler(HANDLE event, ILibWaitHandle_Er
// Broken Pipe
ILibQueue_UnLock(pipeObject->WriteBuffer);
ILibRemoteLogging_printf(ILibChainGetLogger(pipeObject->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Pipe, ILibRemoteLogging_Flags_VerbosityLevel_1, "ILibProcessPipe[WriteHandler]: BrokenPipe(%d) on Pipe: %p", GetLastError(), (void*)pipeObject);
ILibChain_RemoveWaitHandle(pipeObject->manager->ChainLink.ParentChain, pipeObject->mOverlapped->hEvent);
if (pipeObject->brokenPipeHandler != NULL) { ((ILibProcessPipe_GenericBrokenPipeHandler)pipeObject->brokenPipeHandler)(pipeObject); }
ILibProcessPipe_FreePipe(pipeObject);
return(FALSE);
return(TRUE);
}
break;
}
}
if (ILibQueue_IsEmpty(pipeObject->WriteBuffer) != 0)
{
ILibProcessPipe_WaitHandle_Remove(pipeObject->manager, pipeObject->mOverlapped->hEvent);
ILibChain_RemoveWaitHandle(pipeObject->manager->ChainLink.ParentChain, pipeObject->mOverlapped->hEvent);
ILibQueue_UnLock(pipeObject->WriteBuffer);
if (pipeObject->handler != NULL) ((ILibProcessPipe_GenericSendOKHandler)pipeObject->handler)(pipeObject->user1, pipeObject->user2);
}
@@ -1314,51 +992,7 @@ void ILibProcessPipe_Pipe_Pause(ILibProcessPipe_Pipe pipeObject)
else
{
ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.Pause(): Opaque = %p",(void*)p->mOverlapped_opaqueData);
if (p->manager->workerThreadID == GetCurrentThreadId())
{
// Already on the dispatch thread, so we can directly update the list
if (p->mOverlapped_opaqueData == NULL)
{
void *node = ILibLinkedList_GetNode_Search(p->manager->ActivePipes, NULL, p->manager->activeWaitHandle);
if (node != NULL)
{
ILibLinkedList_Remove(node);
p->mOverlapped_opaqueData = p->manager->activeWaitHandle;
ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "...Opaque = %p", (void*)p->mOverlapped_opaqueData);
SetEvent(p->manager->updateEvent); // Just need to set event, so the wait list will be rebuilt
}
else
{
ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.Pause(): Active pipe not in list");
}
}
else
{
ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.Pause(): Already paused");
}
}
else
{
// Not on the dispatch thread, so we'll have to dispatch
p->mOverlapped_opaqueData = NULL;
ILibProcessPipe_WaitHandle_Remove(p->manager, p->mOverlapped->hEvent);
}
//if (p->mOverlapped_opaqueData == NULL)
//{
// void *node = ILibLinkedList_GetNode_Search(p->manager->ActivePipes, NULL, p->manager->activeWaitHandle);
// if (node != NULL)
// {
// ILibLinkedList_Remove(node);
// p->mOverlapped_opaqueData = p->manager->activeWaitHandle;
// ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "...Opaque = %p", (void*)p->mOverlapped_opaqueData);
// SetEvent(p->manager->updateEvent);
// }
// else
// {
// ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.Pause(): Active pipe not in list");
// }
//}
ILibChain_RemoveWaitHandle(p->manager->ChainLink.ParentChain, p->mOverlapped->hEvent);
}
#else
ILibLinkedList_Remove(ILibLinkedList_GetNode_Search(p->manager->ActivePipes, NULL, pipeObject));
@@ -1411,63 +1045,15 @@ void ILibProcessPipe_Pipe_ResumeEx_ContinueProcessing(ILibProcessPipe_PipeObject
}
p->processingLoop = 0;
}
#ifdef WIN32
void __stdcall ILibProcessPipe_Pipe_ResumeEx_APC(ULONG_PTR obj)
{
ILibProcessPipe_PipeObject *p = (ILibProcessPipe_PipeObject*)obj;
ILibProcessPipe_Pipe_ResumeEx_ContinueProcessing(p);
if (p->PAUSED == 0)
{
ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.ResumeEx_APC(): Adding Pipe [%p]", (void*)p);
if (p->mOverlapped_opaqueData != NULL)
{
ILibProcessPipe_WaitHandle_AddEx(p->manager, (ILibProcessPipe_WaitHandle*)p->mOverlapped_opaqueData);
p->mOverlapped_opaqueData = NULL;
}
else
{
ILibProcessPipe_WaitHandle_Add(p->manager, p->mOverlapped->hEvent, p, ILibProcessPipe_Process_ReadHandler);
}
//printf("ReadFile(%p, %d, %d) (ResumeEx_APC)\n", p->mPipe_ReadEnd, p->readOffset + p->totalRead, p->bufferSize - p->readOffset - p->totalRead);
if (p->inProgress == 0)
{
p->inProgress = 1;
ReadFile(p->mPipe_ReadEnd, p->buffer + p->readOffset + p->totalRead, p->bufferSize - p->readOffset - p->totalRead, NULL, p->mOverlapped);
}
}
else
{
ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.ResumeEx_APC(): Skipping Pipe [%p]", (void*)p);
}
}
#endif
void ILibProcessPipe_Pipe_ResumeEx(ILibProcessPipe_PipeObject* p)
{
if (!ILibMemory_CanaryOK(p)) { return; }
ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.ResumeEx(): processingLoop = %d", p->processingLoop);
#ifdef WIN32
if (p->manager->workerThreadID == GetCurrentThreadId())
{
// We were called from the event dispatch (ie: we're on the same thread)
if (p->mOverlapped_opaqueData != NULL)
{
ILibProcessPipe_WaitHandle_AddEx(p->manager, (ILibProcessPipe_WaitHandle*)p->mOverlapped_opaqueData);
p->mOverlapped_opaqueData = NULL;
}
else
{
ILibProcessPipe_WaitHandle_Add(p->manager, p->mOverlapped->hEvent, p, ILibProcessPipe_Process_ReadHandler);
}
p->PAUSED = 0;
}
else
{
// We're on a different thread, so we need to dispatch to the worker thread
QueueUserAPC((PAPCFUNC)ILibProcessPipe_Pipe_ResumeEx_APC, p->manager->workerThread, (ULONG_PTR)p);
}
ILibChain_AddWaitHandle(p->manager->ChainLink.ParentChain, p->mOverlapped->hEvent, -1, ILibProcessPipe_Process_ReadHandler, p);
p->PAUSED = 0;
return;
#endif
@@ -1492,7 +1078,7 @@ void ILibProcessPipe_Pipe_Resume(ILibProcessPipe_Pipe pipeObject)
if (p->mProcess != NULL && p->mProcess->hProcess_needAdd != 0 && p->mProcess->disabled == 0)
{
p->mProcess->hProcess_needAdd = 0;
ILibProcessPipe_WaitHandle_Add(p->manager, p->mProcess->hProcess, p->mProcess, ILibProcessPipe_Process_OnExit);
ILibChain_AddWaitHandle(p->manager->ChainLink.ParentChain, p->mProcess->hProcess, -1, ILibProcessPipe_Process_OnExit, p->mProcess);
}
}
#else
@@ -1576,7 +1162,7 @@ void ILibProcessPipe_Process_StartPipeReader(ILibProcessPipe_PipeObject *pipeObj
//printf("ReadFile(%p, %d, %d) (StartPipeReader)\n", pipeObject->mPipe_ReadEnd, 0, pipeObject->bufferSize);
pipeObject->inProgress = 1;
result = ReadFile(pipeObject->mPipe_ReadEnd, pipeObject->buffer, pipeObject->bufferSize, NULL, pipeObject->mOverlapped);
ILibProcessPipe_WaitHandle_Add(pipeObject->manager, pipeObject->mOverlapped->hEvent, pipeObject, &ILibProcessPipe_Process_ReadHandler);
ILibChain_AddWaitHandle(pipeObject->manager->ChainLink.ParentChain, pipeObject->mOverlapped->hEvent, -1, ILibProcessPipe_Process_ReadHandler, pipeObject);
}
else
{
@@ -1605,11 +1191,6 @@ void ILibProcessPipe_Process_PipeHandler_StdIn(void *user1, void *user2)
}
#ifdef WIN32
void __stdcall ILibProcessPipe_Process_OnExit_ChainSink_DestroySink(ULONG_PTR obj)
{
ILibProcessPipe_Process_Object* j = (ILibProcessPipe_Process_Object*)obj;
if (j->exiting == 0) { ILibProcessPipe_Process_Destroy(j); }
}
void ILibProcessPipe_Process_OnExit_ChainSink(void *chain, void *user)
{
ILibProcessPipe_Process_Object* j = (ILibProcessPipe_Process_Object*)user;
@@ -1617,22 +1198,12 @@ void ILibProcessPipe_Process_OnExit_ChainSink(void *chain, void *user)
BOOL result;
if (j->disabled != 0) { return; }
if (ILibChain_SelectInterrupted(j->chain) != 0)
{
// Winsock appears to be at a select call, so this must've been triggered by an APC, so we must unroll the callstack to continue,
// because winsock is not re-entrant, so we cannot risk making another winsock call directly.
//
ILibChain_RunOnMicrostackThreadEx2(j->chain, ILibProcessPipe_Process_OnExit_ChainSink, user, 0);
return;
}
result = GetExitCodeProcess(j->hProcess, &exitCode);
j->exiting = 1;
j->exitHandler(j, exitCode, j->userObject);
j->exiting ^= 1;
// We can't destroy this now, because we're on the MicrostackThread. We must destroy this on the WindowsRunLoop Thread.
QueueUserAPC((PAPCFUNC)ILibProcessPipe_Process_OnExit_ChainSink_DestroySink, j->parent->workerThread, (ULONG_PTR)j);
if (j->exiting == 0) { ILibProcessPipe_Process_Destroy(j); }
}
#ifdef WIN32
void __stdcall ILibProcessPipe_Process_OnExit_ChainSink_APC(ULONG_PTR obj)
@@ -1640,13 +1211,13 @@ void __stdcall ILibProcessPipe_Process_OnExit_ChainSink_APC(ULONG_PTR obj)
ILibProcessPipe_Process_OnExit_ChainSink(NULL, (void*)obj);
}
#endif
BOOL ILibProcessPipe_Process_OnExit(HANDLE event, ILibWaitHandle_ErrorStatus errors, void* user)
BOOL ILibProcessPipe_Process_OnExit(void *chain, HANDLE event, ILibWaitHandle_ErrorStatus errors, void* user)
{
ILibProcessPipe_Process_Object* j = (ILibProcessPipe_Process_Object*)user;
if (errors != ILibWaitHandle_ErrorStatus_NONE) { return(FALSE); }
UNREFERENCED_PARAMETER(event);
ILibProcessPipe_WaitHandle_Remove(j->parent, j->hProcess);
ILibChain_RemoveWaitHandle(j->chain, j->hProcess);
if ((j->stdOut->PAUSED != 0 && j->stdOut->totalRead > 0) || (j->stdErr->PAUSED != 0 && j->stdErr->totalRead > 0))
{
@@ -1656,19 +1227,14 @@ BOOL ILibProcessPipe_Process_OnExit(HANDLE event, ILibWaitHandle_ErrorStatus err
{
if (j->exitHandler != NULL)
{
// Everyone's lives will be made easier, by context switching to chain thread before making this call
#ifdef WIN32
QueueUserAPC((PAPCFUNC)ILibProcessPipe_Process_OnExit_ChainSink_APC, ILibChain_GetMicrostackThreadHandle(j->parent->ChainLink.ParentChain), (ULONG_PTR)user);
#else
ILibChain_RunOnMicrostackThread(j->parent->ChainLink.ParentChain, ILibProcessPipe_Process_OnExit_ChainSink, user);
#endif
ILibProcessPipe_Process_OnExit_ChainSink(j->chain, user);
}
else
{
ILibProcessPipe_Process_Destroy(j);
}
}
return(FALSE);
return(TRUE);
}
#endif
void ILibProcessPipe_Process_UpdateUserObject(ILibProcessPipe_Process module, void *userObj)
@@ -1682,7 +1248,7 @@ void ILibProcessPipe_Process_RemoveHandlers(ILibProcessPipe_Process module)
if (j != NULL && ILibMemory_CanaryOK(j))
{
j->disabled = 1;
ILibProcessPipe_WaitHandle_Remove(j->parent, j->hProcess);
ILibChain_RemoveWaitHandle(j->chain, j->hProcess);
}
}
#endif
@@ -1699,7 +1265,7 @@ void ILibProcessPipe_Process_AddHandlers(ILibProcessPipe_Process module, int buf
ILibProcessPipe_Process_SetWriteHandler(j->stdIn, &ILibProcessPipe_Process_PipeHandler_StdIn, j, sendOk);
#ifdef WIN32
ILibProcessPipe_WaitHandle_Add(j->parent, j->hProcess, j, &ILibProcessPipe_Process_OnExit);
ILibChain_AddWaitHandle(j->parent->ChainLink.ParentChain, j->hProcess, -1, ILibProcessPipe_Process_OnExit, j);
#endif
}
}
@@ -1758,7 +1324,7 @@ ILibTransport_DoneState ILibProcessPipe_Pipe_Write(ILibProcessPipe_Pipe po, char
retVal = ILibTransport_DoneState_INCOMPLETE;
ILibQueue_EnQueue(pipeObject->WriteBuffer, ILibProcessPipe_WriteData_Create(buffer, bufferLen, ownership));
#ifdef WIN32
ILibProcessPipe_WaitHandle_Add(pipeObject->manager, pipeObject->mOverlapped->hEvent, pipeObject, &ILibProcessPipe_Process_WindowsWriteHandler);
ILibChain_AddWaitHandle(pipeObject->manager->ChainLink.ParentChain, pipeObject->mOverlapped->hEvent, -1, ILibProcessPipe_Process_WindowsWriteHandler, pipeObject);
#else
ILibLifeTime_Add(ILibGetBaseTimer(pipeObject->manager->ChainLink.ParentChain), pipeObject, 0, &ILibProcessPipe_Process_StartPipeReaderWriterEx, NULL); // Need to context switch to Chain Thread
#endif
@@ -1779,7 +1345,7 @@ ILibTransport_DoneState ILibProcessPipe_Pipe_Write(ILibProcessPipe_Pipe po, char
#ifdef WIN32
if (pipeObject->manager != NULL)
{
ILibProcessPipe_WaitHandle_Remove(pipeObject->manager, pipeObject->mOverlapped->hEvent); // Pipe Broken, so remove ourselves from the processing loop
ILibChain_RemoveWaitHandle(pipeObject->manager->ChainLink.ParentChain, pipeObject->mOverlapped->hEvent);
}
#endif
((ILibProcessPipe_GenericBrokenPipeHandler)pipeObject->brokenPipeHandler)(pipeObject);
@@ -1895,7 +1461,6 @@ int ILibProcessPipe_Pipe_ReadEx(ILibProcessPipe_Pipe targetPipe, char *buffer, i
{
if (GetLastError() == ERROR_IO_PENDING)
{
j->usingCompletionRoutine = 1;
j->buffer = buffer;
j->bufferSize = bufferLength;
j->user1 = user;