1
0
mirror of https://github.com/Ylianst/MeshAgent synced 2025-12-06 00:13:33 +00:00
This commit is contained in:
Bryan Roe
2019-11-27 15:20:31 -08:00
parent 847bb0aeb0
commit 8ece5d4cbd
2 changed files with 111 additions and 363 deletions

View File

@@ -451,7 +451,8 @@ ILibTransport_DoneState ILibDuktape_HECI_Session_WriteSink(ILibDuktape_DuplexStr
{
#if defined(WIN32)
state->returnIgnored = 1;
QueueUserAPC((PAPCFUNC)ILibDuktape_HECI_Session_WriteHandler, ILibProcessPipe_Manager_GetWorkerThread(session->mgr), (ULONG_PTR)state);
ILibDuktape_HECI_Session_WriteHandler((ULONG_PTR)state);
//QueueUserAPC((PAPCFUNC)ILibDuktape_HECI_Session_WriteHandler, ILibProcessPipe_Manager_GetWorkerThread(session->mgr), (ULONG_PTR)state);
#elif defined(_POSIX)
if (ILibIsRunningOnChainThread(stream->readableStream->chain) != 0)
{
@@ -534,12 +535,15 @@ void ILibDuktape_HECI_Session_ResumeSink(ILibDuktape_DuplexStream *sender, void
if (session->noPipelining != 0)
{
ILibChain_RunOnMicrostackThread(sender->readableStream->chain, ILibDuktape_HECI_Session_ResumeSink_NoPipeline, session);
// Note: DO NOT 'return' here, because we still need to QueueUserAPC, to resume the stream on Windows
// Note: DO NOT 'return' here, because we still need to resume the stream on Windows
}
#ifdef WIN32
// To Resume, we need to first context switch to the Windows Thread
QueueUserAPC((PAPCFUNC)ILibDuktape_HECI_Session_ResumeSink2, ILibProcessPipe_Manager_GetWorkerThread(session->mgr), (ULONG_PTR)session);
BOOL result = ReadFile(session->descriptor, session->buffer, (DWORD)session->bufferSize, &(session->bytesRead), &(session->v));
if (result == TRUE || GetLastError() == ERROR_IO_PENDING)
{
ILibProcessPipe_WaitHandle_Add(session->mgr, session->v.hEvent, session, ILibDuktape_HECI_Session_ReceiveSink);
}
#endif
}
#ifdef WIN32
@@ -645,7 +649,8 @@ duk_ret_t ILibDuktape_HECI_create_OnClientConnect(duk_context *ctx)
duk_get_prop_string(ctx, -1, ILibDuktape_HECI_ChildProcess); // [HECI][childProcess]
duk_get_prop_string(ctx, -1, ILibDuktape_ChildProcess_Manager); // [HECI][childProcess][manager]
session->mgr = (ILibProcessPipe_Manager)duk_get_pointer(ctx, -1);
QueueUserAPC((PAPCFUNC)ILibDuktape_HECI_Session_Start, ILibProcessPipe_Manager_GetWorkerThread(session->mgr), (ULONG_PTR)session);
ILibDuktape_HECI_Session_Start((ULONG_PTR)session);
//QueueUserAPC((PAPCFUNC)ILibDuktape_HECI_Session_Start, ILibProcessPipe_Manager_GetWorkerThread(session->mgr), (ULONG_PTR)session);
#else
duk_push_this(ctx); // [HECI]
session->descriptor = Duktape_GetIntPropertyValue(ctx, -1, ILibDuktape_HECI_Descriptor, -1);
@@ -727,7 +732,8 @@ duk_ret_t ILibDuktape_HECI_Session_close(duk_context *ctx)
ILibProcessPipe_WaitHandle_Remove(session->mgr, session->v.hEvent);
ILibProcessPipe_WaitHandle_Remove(session->mgr, session->wv.hEvent);
session->stream = NULL;
QueueUserAPC((PAPCFUNC)ILibDuktape_HECI_Session_CloseSink2, ILibProcessPipe_Manager_GetWorkerThread(session->mgr), (ULONG_PTR)session->descriptor);
CloseHandle(session->descriptor);
//QueueUserAPC((PAPCFUNC)ILibDuktape_HECI_Session_CloseSink2, ILibProcessPipe_Manager_GetWorkerThread(session->mgr), (ULONG_PTR)session->descriptor);
}
#else
int d = Duktape_GetIntPropertyValue(ctx, -1, ILibDuktape_HECI_Descriptor, -1);
@@ -952,7 +958,8 @@ duk_ret_t ILibDuktape_HECI_doIoctl(duk_context *ctx)
duk_get_prop_string(ctx, -2, ILibDuktape_HECI_ChildProcess); // [heci][stash][childProcess]
duk_get_prop_string(ctx, -1, ILibDuktape_ChildProcess_Manager); // [heci][stash][childProcess][manager]
data->pipeManager = (ILibProcessPipe_Manager)duk_get_pointer(ctx, -1);
QueueUserAPC((PAPCFUNC)ILibDuktape_HECI_apc_AddIoctl, ILibProcessPipe_Manager_GetWorkerThread(data->pipeManager), (ULONG_PTR)data);
ILibDuktape_HECI_apc_AddIoctl((ULONG_PTR)data);
//QueueUserAPC((PAPCFUNC)ILibDuktape_HECI_apc_AddIoctl, ILibProcessPipe_Manager_GetWorkerThread(data->pipeManager), (ULONG_PTR)data);
#elif defined(_POSIX)
ILibDuktape_HECI_AddIoctl(data);
#endif

View File

@@ -47,14 +47,6 @@ typedef struct ILibProcessPipe_Manager_Object
{
ILibChain_Link ChainLink;
ILibLinkedList ActivePipes;
#ifdef WIN32
int abort;
HANDLE updateEvent;
HANDLE workerThread;
DWORD workerThreadID;
void *activeWaitHandle;
#endif
}ILibProcessPipe_Manager_Object;
struct ILibProcessPipe_PipeObject;
@@ -84,7 +76,7 @@ typedef struct ILibProcessPipe_PipeObject
HANDLE mPipe_ReadEnd;
HANDLE mPipe_WriteEnd;
OVERLAPPED *mOverlapped,*mwOverlapped;
void *mOverlapped_opaqueData, *user3, *user4;
void *user3, *user4;
#else
int mPipe_ReadEnd, mPipe_WriteEnd;
#endif
@@ -158,11 +150,12 @@ BOOL ILibProcessPipe_Process_OnExit(HANDLE event, ILibWaitHandle_ErrorStatus err
typedef struct ILibProcessPipe_WaitHandle
{
ILibProcessPipe_Manager_Object *parent;
HANDLE event;
HANDLE event, registeredHandle;
void *user;
ILibProcessPipe_WaitHandle_Handler callback;
int timeRemaining;
int timeout;
int contextSwitch;
}ILibProcessPipe_WaitHandle;
typedef struct ILibProcessPipe_WaitHandle_APC
{
@@ -172,30 +165,34 @@ typedef struct ILibProcessPipe_WaitHandle_APC
ILibProcessPipe_WaitHandle_Handler callback;
void *user;
}ILibProcessPipe_WaitHandle_APC;
HANDLE ILibProcessPipe_Manager_GetWorkerThread(ILibProcessPipe_Manager mgr)
{
return(((ILibProcessPipe_Manager_Object*)mgr)->workerThread);
}
int ILibProcessPipe_Manager_WindowsWaitHandles_Remove_Comparer(void *source, void *matchWith)
void __stdcall ILibProcessPipe_WaitHandle_SignaledOrTimeout(void *user, BOOLEAN TimerOrWaitFired); // Prototype
int ILibProcessPipe_Manager_WindowsWaitHandles_Remove_event_Comparer(void *source, void *matchWith)
{
if (source == NULL) { return 1; }
return(((ILibProcessPipe_WaitHandle*)source)->event == matchWith ? 0 : 1);
}
int ILibProcessPipe_Manager_WindowsWaitHandles_Remove_registeredHandle_Comparer(void *source, void *matchWith)
{
if (source == NULL) { return 1; }
return(((ILibProcessPipe_WaitHandle*)source)->registeredHandle == matchWith ? 0 : 1);
}
void __stdcall ILibProcessPipe_WaitHandle_Remove_APC(ULONG_PTR obj)
{
ILibProcessPipe_Manager_Object *manager = (ILibProcessPipe_Manager_Object*)((void**)obj)[0];
HANDLE event = (HANDLE)((void**)obj)[1];
void *node = ILibLinkedList_GetNode_Search(manager->ActivePipes, ILibProcessPipe_Manager_WindowsWaitHandles_Remove_Comparer, event);
void *node = ILibLinkedList_GetNode_Search(manager->ActivePipes, ILibProcessPipe_Manager_WindowsWaitHandles_Remove_event_Comparer, event);
ILibProcessPipe_WaitHandle *waiter;
if (node != NULL)
{
waiter = (ILibProcessPipe_WaitHandle*)ILibLinkedList_GetDataFromNode(node);
free(waiter);
ILibLinkedList_Remove(node);
SetEvent(manager->updateEvent);
if (waiter->registeredHandle != NULL)
{
UnregisterWait(waiter->registeredHandle); waiter->registeredHandle = NULL;
}
ILibMemory_Free(waiter);
ILibLinkedList_Remove(node);
}
free((void*)obj);
}
@@ -206,35 +203,57 @@ void ILibProcessPipe_WaitHandle_Remove(ILibProcessPipe_Manager mgr, HANDLE event
data[0] = manager;
data[1] = event;
QueueUserAPC((PAPCFUNC)ILibProcessPipe_WaitHandle_Remove_APC, manager->workerThread, (ULONG_PTR)data);
QueueUserAPC((PAPCFUNC)ILibProcessPipe_WaitHandle_Remove_APC, ILibChain_GetMicrostackThreadHandle(manager->ChainLink.ParentChain), (ULONG_PTR)data);
}
void __stdcall ILibProcessPipe_WaitHandle_Add_APC(ULONG_PTR obj)
void __stdcall ILibProcessPipe_WaitHandle_unregister(ULONG_PTR u)
{
ILibProcessPipe_WaitHandle *waitHandle = (ILibProcessPipe_WaitHandle*)obj;
if (waitHandle->timeout > 0) { waitHandle->timeRemaining = waitHandle->timeout; }
ILibLinkedList_AddTail(waitHandle->parent->ActivePipes, waitHandle);
}
void ILibProcessPipe_WaitHandle_AddEx(ILibProcessPipe_Manager mgr, ILibProcessPipe_WaitHandle *waitHandle)
{
ILibProcessPipe_Manager_Object *manager = (ILibProcessPipe_Manager_Object*)mgr;
if (manager->workerThreadID == GetCurrentThreadId())
ILibProcessPipe_WaitHandle *waitHandle = (ILibProcessPipe_WaitHandle*)u;
if (!ILibMemory_CanaryOK((void*)u) || waitHandle->registeredHandle == NULL) { return; }
void *node = ILibLinkedList_GetNode_Search(waitHandle->parent->ActivePipes, ILibProcessPipe_Manager_WindowsWaitHandles_Remove_registeredHandle_Comparer, waitHandle->registeredHandle);
if (node != NULL)
{
// We're on the same thread, so we can just add it in
if (waitHandle->timeout > 0) { waitHandle->timeRemaining = waitHandle->timeout; }
ILibLinkedList_AddTail(manager->ActivePipes, waitHandle);
SetEvent(manager->updateEvent);
ILibLinkedList_Remove(node);
}
UnregisterWait(waitHandle->registeredHandle);
waitHandle->registeredHandle = NULL;
ILibMemory_Free(waitHandle);
}
void __stdcall ILibProcessPipe_WaitHandle_reregister(ULONG_PTR u)
{
if (!ILibMemory_CanaryOK((void*)u)) { return; }
ILibProcessPipe_WaitHandle *waitHandle = (ILibProcessPipe_WaitHandle*)u;
if (waitHandle->registeredHandle != NULL)
{
UnregisterWait(waitHandle->registeredHandle);
waitHandle->registeredHandle = NULL;
}
RegisterWaitForSingleObject(&(waitHandle->registeredHandle), waitHandle->event, ILibProcessPipe_WaitHandle_SignaledOrTimeout, waitHandle, (ULONG)waitHandle->timeout, WT_EXECUTEINPERSISTENTTHREAD | WT_EXECUTEONLYONCE);
}
void __stdcall ILibProcessPipe_WaitHandle_SignaledOrTimeout(void *user, BOOLEAN TimerOrWaitFired)
{
if (!ILibMemory_CanaryOK(user)) { return; }
ILibProcessPipe_WaitHandle *waitHandle = (ILibProcessPipe_WaitHandle*)user;
HANDLE chainHandle = ILibChain_GetMicrostackThreadHandle(waitHandle->parent->ChainLink.ParentChain);
if (waitHandle->callback == NULL || waitHandle->callback(waitHandle->event, TimerOrWaitFired? ILibWaitHandle_ErrorStatus_TIMEOUT:ILibWaitHandle_ErrorStatus_NONE, waitHandle->user) == FALSE)
{
// Unregister
QueueUserAPC((PAPCFUNC)ILibProcessPipe_WaitHandle_unregister, chainHandle, (ULONG_PTR)waitHandle);
}
else
{
// We're on a different thread, so we need to dispatch to the worker thread
QueueUserAPC((PAPCFUNC)ILibProcessPipe_WaitHandle_Add_APC, manager->workerThread, (ULONG_PTR)waitHandle);
// Re-Register
QueueUserAPC((PAPCFUNC)ILibProcessPipe_WaitHandle_reregister, chainHandle, (ULONG_PTR)waitHandle);
}
}
void ILibProcessPipe_WaitHandle_Add_WithNonZeroTimeout(ILibProcessPipe_Manager mgr, HANDLE event, int milliseconds, void *user, ILibProcessPipe_WaitHandle_Handler callback)
{
ILibProcessPipe_Manager_Object *manager = (ILibProcessPipe_Manager_Object*)mgr;
ILibProcessPipe_WaitHandle *waitHandle;
waitHandle = (ILibProcessPipe_WaitHandle*)ILibMemory_Allocate(sizeof(ILibProcessPipe_WaitHandle), 0, NULL, NULL);
waitHandle = (ILibProcessPipe_WaitHandle*)ILibMemory_SmartAllocate(sizeof(ILibProcessPipe_WaitHandle));
waitHandle->parent = manager;
waitHandle->event = event;
@@ -242,234 +261,39 @@ void ILibProcessPipe_WaitHandle_Add_WithNonZeroTimeout(ILibProcessPipe_Manager m
waitHandle->callback = callback;
waitHandle->timeout = milliseconds;
ILibProcessPipe_WaitHandle_AddEx(mgr, waitHandle);
}
void __stdcall ILibProcessPipe_WaitHandle_Add2_apcsink(ULONG_PTR obj)
{
if (ILibMemory_CanaryOK((void*)obj))
if (RegisterWaitForSingleObject(&(waitHandle->registeredHandle), waitHandle->event, ILibProcessPipe_WaitHandle_SignaledOrTimeout, waitHandle, (ULONG)milliseconds, WT_EXECUTEINPERSISTENTTHREAD | WT_EXECUTEONLYONCE) == 0)
{
ILibProcessPipe_WaitHandle_APC *apcState = (ILibProcessPipe_WaitHandle_APC*)obj;
if (apcState->callback != NULL) { apcState->callback(apcState->ev, apcState->status, apcState->user); }
ILibMemory_Free(apcState);
// FAILED
ILibMemory_Free(waitHandle);
}
}
BOOL ILibProcessPipe_WaitHandle_Add2_sink(HANDLE event, ILibWaitHandle_ErrorStatus status, void* user)
{
if (ILibMemory_CanaryOK(user))
else
{
QueueUserAPC((PAPCFUNC)ILibProcessPipe_WaitHandle_Add2_apcsink, ((ILibProcessPipe_WaitHandle_APC*)user)->callingThread, (ULONG_PTR)user);
ILibLinkedList_AddTail(manager->ActivePipes, waitHandle);
}
return(FALSE);
}
void ILibProcessPipe_WaitHandle_Add2_WithNonZeroTimeout(ILibProcessPipe_Manager mgr, HANDLE event, int milliseconds, void *user, ILibProcessPipe_WaitHandle_Handler callback)
{
ILibProcessPipe_WaitHandle_APC *apcState = ILibMemory_SmartAllocate(sizeof(ILibProcessPipe_WaitHandle_APC));
DuplicateHandle(GetCurrentProcess(), GetCurrentThread(), GetCurrentProcess(), &apcState->callingThread, THREAD_SET_CONTEXT, FALSE, 0);
apcState->callback = callback;
apcState->user = user;
ILibProcessPipe_WaitHandle_Add_WithNonZeroTimeout(mgr, event, milliseconds, apcState, ILibProcessPipe_WaitHandle_Add2_sink);
}
ILibProcessPipe_Manager_Object *manager = (ILibProcessPipe_Manager_Object*)mgr;
ILibProcessPipe_WaitHandle *waitHandle;
waitHandle = (ILibProcessPipe_WaitHandle*)ILibMemory_SmartAllocate(sizeof(ILibProcessPipe_WaitHandle));
void ILibProcessPipe_Manager_WindowsRunLoopEx(void *arg)
{
ILibProcessPipe_Manager_Object *manager = (ILibProcessPipe_Manager_Object*)arg;
HANDLE hList[FD_SETSIZE * (2*sizeof(HANDLE))];
ILibLinkedList active = manager->ActivePipes;
uint64_t timestamp1;
DWORD elapsedTime = 0;
void* node;
ILibProcessPipe_WaitHandle* data;
waitHandle->parent = manager;
waitHandle->event = event;
waitHandle->user = user;
waitHandle->callback = callback;
waitHandle->timeout = milliseconds;
waitHandle->contextSwitch = 1;
int i, jx;
DWORD x;
DWORD maxTimeout = INFINITE;
memset(hList, 0, sizeof(HANDLE)*FD_SETSIZE);
manager->workerThreadID = GetCurrentThreadId();
while (manager->abort == 0)
if (RegisterWaitForSingleObject(&(waitHandle->registeredHandle), waitHandle->event, ILibProcessPipe_WaitHandle_SignaledOrTimeout, waitHandle, (ULONG)milliseconds, WT_EXECUTEINPERSISTENTTHREAD | WT_EXECUTEONLYONCE) == 0)
{
hList[0] = manager->updateEvent;
i = 1;
//Prepare the rest of the WaitHandle Array, for the WaitForMultipleObject call
maxTimeout = INFINITE;
node = ILibLinkedList_GetNode_Head(active);
while (node != NULL)
{
if ((data = (ILibProcessPipe_WaitHandle*)ILibLinkedList_GetDataFromNode(node)) != NULL)
{
ILibRemoteLogging_printf(ILibChainGetLogger(manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "WindowsRunLoopEx(): [%p]", data);
hList[i] = data->event;
hList[i + FD_SETSIZE] = (HANDLE)data;
if (data->timeout > 0)
{
if (elapsedTime > 0)
{
// We popped out of the wait, but didn't decrement timeRemaining yet
if (data->timeRemaining < (int)elapsedTime)
{
data->timeRemaining = 0;
}
else
{
data->timeRemaining -= (int)elapsedTime;
}
}
if ((DWORD)data->timeRemaining < maxTimeout) { maxTimeout = (DWORD)data->timeRemaining; }
}
++i;
}
node = ILibLinkedList_GetNextNode(node);
}
ILibRemoteLogging_printf(ILibChainGetLogger(manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "WindowsRunLoopEx(): Number of Handles: %d", i);
timestamp1 = (uint64_t)ILibGetUptime();
while ((x = WaitForMultipleObjectsEx(i, hList, FALSE, maxTimeout, TRUE)) != WAIT_IO_COMPLETION)
{
elapsedTime = (DWORD)((uint64_t)ILibGetUptime() - timestamp1);
if (x == WAIT_FAILED || x == WAIT_TIMEOUT || (x-WAIT_OBJECT_0) == 0) { break; }
data = (ILibProcessPipe_WaitHandle*)hList[x + FD_SETSIZE];
manager->activeWaitHandle = data;
if (data != NULL && data->callback != NULL)
{
if (data->callback(data->event, ILibWaitHandle_ErrorStatus_NONE, data->user) == FALSE)
{
// FALSE means to remove the WaitHandle
void *node = ILibLinkedList_GetNode_Search(manager->ActivePipes, ILibProcessPipe_Manager_WindowsWaitHandles_Remove_Comparer, data->event);
if (node != NULL)
{
free(ILibLinkedList_GetDataFromNode(node));
ILibLinkedList_Remove(node);
break;
}
}
}
if (maxTimeout != INFINITE)
{
// Need to recalculate maxTimeout
for (jx = 1; jx < i; ++jx)
{
data = (ILibProcessPipe_WaitHandle*)hList[jx + FD_SETSIZE];
if (data->timeout > 0) // Timeout was specified
{
if (data->timeRemaining > (int)elapsedTime) // We can just decrement time left
{
data->timeRemaining -= (int)elapsedTime;
}
else
{
data->timeRemaining = 0; // Already expired
}
if (data->timeRemaining < (int)maxTimeout)
{
maxTimeout = (DWORD)data->timeRemaining; // Set the new maxTimeout
}
}
}
}
}
if (x == WAIT_IO_COMPLETION) { elapsedTime = (DWORD)((uint64_t)ILibGetUptime() - timestamp1); }
if (x == WAIT_FAILED)
{
for (jx = 1; jx < i; ++jx)
{
if (WaitForSingleObject(hList[jx], 0) == WAIT_FAILED)
{
// This handle is invalid, so let's error it out and remove it
node = ILibLinkedList_GetNode_Search(manager->ActivePipes, ILibProcessPipe_Manager_WindowsWaitHandles_Remove_Comparer, hList[jx]);
if (node != NULL)
{
// Propagate an Error up
data = (ILibProcessPipe_WaitHandle*)hList[jx + FD_SETSIZE];
if (data != NULL && data->callback != NULL)
{
manager->activeWaitHandle = data;
data->callback(data->event, ILibWaitHandle_ErrorStatus_INVALID_HANDLE, data->user);
}
// Remove the wait handle
free(ILibLinkedList_GetDataFromNode(node));
ILibLinkedList_Remove(node);
break;
}
}
}
}
if (x == WAIT_TIMEOUT)
{
for (jx = 1; jx < i; ++jx)
{
data = (ILibProcessPipe_WaitHandle*)hList[jx + FD_SETSIZE];
if (data->timeout > 0)
{
// A timeout was specified, so lets check it
if (data->timeRemaining <= (int)elapsedTime)
{
data->timeRemaining = 0;
}
else
{
data->timeRemaining -= (int)elapsedTime;
}
if (data->timeRemaining == 0)
{
manager->activeWaitHandle = data;
if (data->callback == NULL || data->callback(data->event, ILibWaitHandle_ErrorStatus_TIMEOUT, data->user) == FALSE)
{
// Remove Wait Handle
node = ILibLinkedList_GetNode_Search(manager->ActivePipes, ILibProcessPipe_Manager_WindowsWaitHandles_Remove_Comparer, data->event);
if (node != NULL)
{
free(ILibLinkedList_GetDataFromNode(node));
ILibLinkedList_Remove(node);
}
}
else
{
// Reset timeout
data->timeRemaining = data->timeout;
}
}
}
}
elapsedTime = 0; // Set this to zero, becuase we already decremented everyone's time remaining
}
ResetEvent(manager->updateEvent);
// FAILED
ILibMemory_Free(waitHandle);
}
//
// We need to enumerate the list of handles, and free them, because now that this thread
// is not in an alertable state, the parent will not be able to Queue any actions for cleanup
//
while ((node = ILibLinkedList_GetNode_Head(manager->ActivePipes)) != NULL)
else
{
free(ILibLinkedList_GetDataFromNode(node));
ILibLinkedList_Remove(node);
ILibLinkedList_AddTail(manager->ActivePipes, waitHandle);
}
}
ILibProcessPipe_Manager_WindowsRunLoop(void *arg)
{
ILib_DumpEnabledContext winException;
__try
{
ILibProcessPipe_Manager_WindowsRunLoopEx(arg);
}
__except (ILib_WindowsExceptionFilterEx(GetExceptionCode(), GetExceptionInformation(), &winException))
{
ILib_WindowsExceptionDebugEx(&winException);
}
}
void ILibProcessPipe_Manager_Start(void* chain, void* user)
{
ILibProcessPipe_Manager_Object* man = (ILibProcessPipe_Manager_Object*)user;
UNREFERENCED_PARAMETER(chain);
man->workerThread = ILibSpawnNormalThread((voidfp1)&ILibProcessPipe_Manager_WindowsRunLoop, user);
}
#else
void ILibProcessPipe_Process_ReadHandler(void* user);
void ILibProcessPipe_Manager_OnPreSelect(void* object, fd_set *readset, fd_set *writeset, fd_set *errorset, int* blocktime)
@@ -522,9 +346,7 @@ void ILibProcessPipe_Manager_OnDestroy(void *object)
ILibProcessPipe_Manager_Object *man = (ILibProcessPipe_Manager_Object*)object;
#ifdef WIN32
man->abort = 1;
SetEvent(man->updateEvent);
WaitForSingleObject(man->workerThread, INFINITE);
// ToDo: Enumerate List, and unregister everything
#endif
ILibLinkedList_Destroy(man->ActivePipes);
}
@@ -538,10 +360,7 @@ ILibProcessPipe_Manager ILibProcessPipe_Manager_Create(void *chain)
retVal->ChainLink.ParentChain = chain;
retVal->ActivePipes = ILibLinkedList_CreateEx(sizeof(int));
#ifdef WIN32
retVal->updateEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
ILibChain_RunOnMicrostackThread(chain, ILibProcessPipe_Manager_Start, retVal);
#else
#ifndef WIN32
retVal->ChainLink.PreSelectHandler = &ILibProcessPipe_Manager_OnPreSelect;
retVal->ChainLink.PostSelectHandler = &ILibProcessPipe_Manager_OnPostSelect;
#endif
@@ -716,13 +535,15 @@ void ILibProcessPipe_Process_Destroy(ILibProcessPipe_Process_Object *p)
if (!ILibMemory_CanaryOK(p)) { return; }
#ifdef WIN32
ILibProcessPipe_Process_Destroy_WinRunThread_Data *data = ILibMemory_AllocateA(sizeof(ILibProcessPipe_Process_Destroy_WinRunThread_Data));
data->pj = p;
data->h = CreateEvent(NULL, TRUE, FALSE, NULL);
// We can't destroy this now, because we're on the MicrostackThread. We must destroy this on the WindowsRunLoop Thread.
QueueUserAPC((PAPCFUNC)ILibProcessPipe_Process_Destroy_WinRunThread, p->parent->workerThread, (ULONG_PTR)data);
WaitForSingleObjectEx(data->h, 3000, TRUE);
CloseHandle(data->h);
//ToDo: Do Something here
//ILibProcessPipe_Process_Destroy_WinRunThread_Data *data = ILibMemory_AllocateA(sizeof(ILibProcessPipe_Process_Destroy_WinRunThread_Data));
//data->pj = p;
//data->h = CreateEvent(NULL, TRUE, FALSE, NULL);
//// We can't destroy this now, because we're on the MicrostackThread. We must destroy this on the WindowsRunLoop Thread.
//QueueUserAPC((PAPCFUNC)ILibProcessPipe_Process_Destroy_WinRunThread, p->parent->workerThread, (ULONG_PTR)data);
//WaitForSingleObjectEx(data->h, 3000, TRUE);
//CloseHandle(data->h);
#else
if (p->exiting != 0) { return; }
if (p->stdIn != NULL) { ILibProcessPipe_FreePipe(p->stdIn); }
@@ -1270,52 +1091,8 @@ void ILibProcessPipe_Pipe_Pause(ILibProcessPipe_Pipe pipeObject)
}
else
{
ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.Pause(): Opaque = %p",(void*)p->mOverlapped_opaqueData);
if (p->manager->workerThreadID == GetCurrentThreadId())
{
// Already on the dispatch thread, so we can directly update the list
if (p->mOverlapped_opaqueData == NULL)
{
void *node = ILibLinkedList_GetNode_Search(p->manager->ActivePipes, NULL, p->manager->activeWaitHandle);
if (node != NULL)
{
ILibLinkedList_Remove(node);
p->mOverlapped_opaqueData = p->manager->activeWaitHandle;
ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "...Opaque = %p", (void*)p->mOverlapped_opaqueData);
SetEvent(p->manager->updateEvent); // Just need to set event, so the wait list will be rebuilt
}
else
{
ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.Pause(): Active pipe not in list");
}
}
else
{
ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.Pause(): Already paused");
}
}
else
{
// Not on the dispatch thread, so we'll have to dispatch
p->mOverlapped_opaqueData = NULL;
ILibProcessPipe_WaitHandle_Remove(p->manager, p->mOverlapped->hEvent);
}
//if (p->mOverlapped_opaqueData == NULL)
//{
// void *node = ILibLinkedList_GetNode_Search(p->manager->ActivePipes, NULL, p->manager->activeWaitHandle);
// if (node != NULL)
// {
// ILibLinkedList_Remove(node);
// p->mOverlapped_opaqueData = p->manager->activeWaitHandle;
// ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "...Opaque = %p", (void*)p->mOverlapped_opaqueData);
// SetEvent(p->manager->updateEvent);
// }
// else
// {
// ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.Pause(): Active pipe not in list");
// }
//}
ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.Pause()");
ILibProcessPipe_WaitHandle_Remove(p->manager, p->mOverlapped->hEvent);
}
#else
ILibLinkedList_Remove(ILibLinkedList_GetNode_Search(p->manager->ActivePipes, NULL, pipeObject));
@@ -1368,59 +1145,20 @@ void ILibProcessPipe_Pipe_ResumeEx_ContinueProcessing(ILibProcessPipe_PipeObject
}
p->processingLoop = 0;
}
#ifdef WIN32
void __stdcall ILibProcessPipe_Pipe_ResumeEx_APC(ULONG_PTR obj)
{
ILibProcessPipe_PipeObject *p = (ILibProcessPipe_PipeObject*)obj;
ILibProcessPipe_Pipe_ResumeEx_ContinueProcessing(p);
if (p->PAUSED == 0)
{
ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.ResumeEx_APC(): Adding Pipe [%p]", (void*)p);
if (p->mOverlapped_opaqueData != NULL)
{
ILibProcessPipe_WaitHandle_AddEx(p->manager, (ILibProcessPipe_WaitHandle*)p->mOverlapped_opaqueData);
p->mOverlapped_opaqueData = NULL;
}
else
{
ILibProcessPipe_WaitHandle_Add(p->manager, p->mOverlapped->hEvent, p, ILibProcessPipe_Process_ReadHandler);
}
//printf("ReadFile(%p, %d, %d) (ResumeEx_APC)\n", p->mPipe_ReadEnd, p->readOffset + p->totalRead, p->bufferSize - p->readOffset - p->totalRead);
ReadFile(p->mPipe_ReadEnd, p->buffer + p->readOffset + p->totalRead, p->bufferSize - p->readOffset - p->totalRead, NULL, p->mOverlapped);
}
else
{
ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.ResumeEx_APC(): Skipping Pipe [%p]", (void*)p);
}
}
#endif
void ILibProcessPipe_Pipe_ResumeEx(ILibProcessPipe_PipeObject* p)
{
if (!ILibMemory_CanaryOK(p)) { return; }
ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.ResumeEx(): processingLoop = %d", p->processingLoop);
#ifdef WIN32
if (p->manager->workerThreadID == GetCurrentThreadId())
if (!ILibIsRunningOnChainThread(p->manager->ChainLink.ParentChain))
{
// We were called from the event dispatch (ie: we're on the same thread)
if (p->mOverlapped_opaqueData != NULL)
{
ILibProcessPipe_WaitHandle_AddEx(p->manager, (ILibProcessPipe_WaitHandle*)p->mOverlapped_opaqueData);
p->mOverlapped_opaqueData = NULL;
}
else
{
ILibProcessPipe_WaitHandle_Add(p->manager, p->mOverlapped->hEvent, p, ILibProcessPipe_Process_ReadHandler);
}
p->PAUSED = 0;
}
else
{
// We're on a different thread, so we need to dispatch to the worker thread
QueueUserAPC((PAPCFUNC)ILibProcessPipe_Pipe_ResumeEx_APC, p->manager->workerThread, (ULONG_PTR)p);
QueueUserAPC((PAPCFUNC)NULL, ILibChain_GetMicrostackThreadHandle(p->manager->ChainLink.ParentChain), (ULONG_PTR)p);
return;
}
ILibProcessPipe_WaitHandle_Add(p->manager, p->mOverlapped->hEvent, p, ILibProcessPipe_Process_ReadHandler);
p->PAUSED = 0;
return;
#endif
@@ -1582,8 +1320,11 @@ void ILibProcessPipe_Process_OnExit_ChainSink(void *chain, void *user)
j->exitHandler(j, exitCode, j->userObject);
j->exiting ^= 1;
if (j->exiting == 0) { ILibProcessPipe_Process_Destroy(j); }
// We can't destroy this now, because we're on the MicrostackThread. We must destroy this on the WindowsRunLoop Thread.
QueueUserAPC((PAPCFUNC)ILibProcessPipe_Process_OnExit_ChainSink_DestroySink, j->parent->workerThread, (ULONG_PTR)j);
//QueueUserAPC((PAPCFUNC)ILibProcessPipe_Process_OnExit_ChainSink_DestroySink, j->parent->workerThread, (ULONG_PTR)j);
}
#ifdef WIN32
void __stdcall ILibProcessPipe_Process_OnExit_ChainSink_APC(ULONG_PTR obj)