1
0
mirror of https://github.com/Ylianst/MeshAgent synced 2026-02-18 02:19:02 +00:00

1. Added ILibChain_WriteEx()

2. Updated ILibChain_Continue() on windows to take an optional list of HANDLE**
3. Updated net.socket IPC on Windows  to use ILibChain_ReadEx and ILibChain_WriteEx
4. Fixed child_process.waitExit() on windows to pass only the wait handles for the process
5. Added GetWaitHandles() to ILibProcessPipe
This commit is contained in:
Bryan Roe
2020-05-15 17:25:49 -07:00
parent 89cafa3ba4
commit 97ad48f2f2
7 changed files with 344 additions and 199 deletions

View File

@@ -2091,7 +2091,7 @@ int ILibChain_WindowsSelect(void *chain, fd_set *readset, fd_set *writeset, fd_s
}
return(slct);
}
void ILibChain_SetupWindowsWaitObject(HANDLE* waitList, int *waitListCount, struct timeval *tv, DWORD *timeout, fd_set *readset, fd_set *writeset, fd_set *errorset, ILibLinkedList handleList)
void ILibChain_SetupWindowsWaitObject(HANDLE* waitList, int *waitListCount, struct timeval *tv, DWORD *timeout, fd_set *readset, fd_set *writeset, fd_set *errorset, ILibLinkedList handleList, HANDLE **onlyHandles)
{
HANDLE selectHandles[FD_SETSIZE];
memset(selectHandles, 0, sizeof(selectHandles));
@@ -2101,7 +2101,7 @@ void ILibChain_SetupWindowsWaitObject(HANDLE* waitList, int *waitListCount, stru
*waitListCount = 0;
return;
}
int chkIndex;
void *node;
struct timeval currentTime;
struct timeval expirationTime;
@@ -2151,6 +2151,22 @@ void ILibChain_SetupWindowsWaitObject(HANDLE* waitList, int *waitListCount, stru
node = ILibLinkedList_GetNode_Head(handleList);
while (node != NULL)
{
if (onlyHandles != NULL)
{
for (chkIndex = 0; onlyHandles[chkIndex] != NULL; ++chkIndex)
{
if ((HANDLE)ILibLinkedList_GetDataFromNode(node) == onlyHandles[chkIndex])
{
chkIndex = -1;
break;
}
}
if (chkIndex != -1)
{
node = ILibLinkedList_GetNextNode(node);
continue;
}
}
i = x++;
if (waitList[i] != NULL && waitList[ILibChain_HandleInfoIndex(i)] == NULL)
{
@@ -2184,7 +2200,11 @@ ILibChain_ContinuationStates ILibChain_GetContinuationState(void *chain)
{
return(((ILibBaseChain*)chain)->continuationState);
}
#ifdef WIN32
ILibExportMethod void ILibChain_Continue(void *Chain, ILibChain_Link **modules, int moduleCount, int maxTimeout, HANDLE **handles)
#else
ILibExportMethod void ILibChain_Continue(void *Chain, ILibChain_Link **modules, int moduleCount, int maxTimeout)
#endif
{
ILibBaseChain *chain = (ILibBaseChain*)Chain;
ILibChain_Link_Hook *nodeHook;
@@ -2207,6 +2227,11 @@ ILibExportMethod void ILibChain_Continue(void *Chain, ILibChain_Link **modules,
gettimeofday(&startTime, NULL);
ILibRemoteLogging_printf(ILibChainGetLogger(chain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ContinueChain...");
#ifdef WIN32
HANDLE currentHandle = chain->currentHandle;
ILibChain_WaitHandleInfo* currentInfo = chain->currentInfo;
#endif
while (root->TerminateFlag == 0 && root->continuationState == ILibChain_ContinuationState_CONTINUE)
{
if (maxTimeout > 0)
@@ -2285,7 +2310,7 @@ ILibExportMethod void ILibChain_Continue(void *Chain, ILibChain_Link **modules,
int x = 0;
DWORD waitTimeout = 0;
ILibChain_SetupWindowsWaitObject(chain->WaitHandles, &x, &tv, &waitTimeout, &readset, &writeset, &errorset, chain->auxSelectHandles);
ILibChain_SetupWindowsWaitObject(chain->WaitHandles, &x, &tv, &waitTimeout, &readset, &writeset, &errorset, chain->auxSelectHandles, handles);
slct = ILibChain_WindowsSelect(chain, &readset, &writeset, &errorset, chain->WaitHandles, x, waitTimeout);
#else
slct = select(FD_SETSIZE, &readset, &writeset, &errorset, &tv);
@@ -2351,6 +2376,10 @@ ILibExportMethod void ILibChain_Continue(void *Chain, ILibChain_Link **modules,
ILibRemoteLogging_printf(ILibChainGetLogger(chain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ContinueChain...Ending...");
root->node = currentNode;
#ifdef WIN32
root->currentHandle = currentHandle;
root->currentInfo = currentInfo;
#endif
}
ILibExportMethod void ILibChain_EndContinue(void *chain)
@@ -2991,16 +3020,46 @@ void *ILibChain_GetObjectForDescriptor(void *chain, int fd)
}
#ifdef WIN32
BOOL ILibChain_ReadEx_Sink(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus status, void *user)
BOOL ILibChain_WriteEx_Sink(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus status, void *user)
{
ILibChain_ReadEx_data *data = (ILibChain_ReadEx_data*)user;
DWORD bytesRead = 0;
ILibChain_WriteEx_data *data = (ILibChain_WriteEx_data*)user;
DWORD bytesWritten = 0;
if (GetOverlappedResult(data->fileHandle, data->p, &bytesRead, FALSE) && bytesRead > 0)
if (GetOverlappedResult(data->fileHandle, data->p, &bytesWritten, FALSE) && bytesWritten > 0)
{
if (data->handler != NULL) { data->handler(chain, data->fileHandle, ILibWaitHandle_ErrorStatus_NONE, data->buffer, bytesRead, data->user); }
ILibMemory_Free(data);
return(FALSE);
data->bytesLeft -= (int)bytesWritten;
data->totalWritten += (int)bytesWritten;
data->buffer = data->buffer + bytesWritten;
if (data->bytesLeft == 0)
{
// Done Writing
if (data->handler != NULL) { data->handler(chain, data->fileHandle, ILibWaitHandle_ErrorStatus_NONE, data->totalWritten, data->user); }
ILibMemory_Free(data);
return(FALSE);
}
else
{
// More Data to write
BOOL ret = FALSE;
switch (ILibChain_WriteEx(chain, h, data->p, data->buffer, data->bytesLeft, ILibChain_WriteEx_Sink, data))
{
case ILibTransport_DoneState_COMPLETE:
data->totalWritten += data->bytesLeft;
data->bytesLeft = 0;
if (data->handler != NULL) { data->handler(chain, data->fileHandle, ILibWaitHandle_ErrorStatus_NONE, data->totalWritten, data->user); }
ILibMemory_Free(data);
ret = FALSE;
break;
case ILibTransport_DoneState_INCOMPLETE:
ret = TRUE;
case ILibTransport_DoneState_ERROR:
if (data->handler != NULL) { data->handler(chain, data->fileHandle, ILibWaitHandle_ErrorStatus_IO_ERROR, 0, data->user); }
ILibMemory_Free(data);
ret = FALSE;
break;
}
return(ret);
}
}
else
{
@@ -3010,6 +3069,35 @@ BOOL ILibChain_ReadEx_Sink(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus sta
return(TRUE);
}
else
{
// ERROR
if (data->handler != NULL) { data->handler(chain, data->fileHandle, ILibWaitHandle_ErrorStatus_IO_ERROR, 0, data->user); }
ILibMemory_Free(data);
return(FALSE);
}
}
}
BOOL ILibChain_ReadEx_Sink(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus status, void *user)
{
ILibChain_ReadEx_data *data = (ILibChain_ReadEx_data*)user;
DWORD bytesRead = 0;
DWORD err;
if (GetOverlappedResult(data->fileHandle, data->p, &bytesRead, FALSE) && bytesRead > 0)
{
if (data->handler != NULL) { data->handler(chain, data->fileHandle, ILibWaitHandle_ErrorStatus_NONE, data->buffer, bytesRead, data->user); }
ILibMemory_Free(data);
return(FALSE);
}
else
{
if ((err=GetLastError()) == ERROR_IO_PENDING)
{
// Still pending, so wait for another callback
return(TRUE);
}
else
{
// ERROR
if (data->handler != NULL) { data->handler(chain, data->fileHandle, ILibWaitHandle_ErrorStatus_IO_ERROR, data->buffer, 0, data->user); }
@@ -3018,6 +3106,37 @@ BOOL ILibChain_ReadEx_Sink(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus sta
}
}
}
ILibTransport_DoneState ILibChain_WriteEx(void *chain, HANDLE h, OVERLAPPED *p, char *buffer, int bufferLen, ILibChain_WriteEx_Handler handler, void *user)
{
int e = 0;
if (!WriteFile(h, buffer, (DWORD)bufferLen, NULL, p))
{
if ((e = GetLastError()) == ERROR_IO_PENDING)
{
// Completing Asynchronously
ILibChain_WriteEx_data *state = (ILibChain_WriteEx_data*)ILibMemory_SmartAllocate(sizeof(ILibChain_WriteEx_data));
state->buffer = buffer;
state->bytesLeft = bufferLen;
state->totalWritten = 0;
state->p = p;
state->handler = handler;
state->fileHandle = h;
state->user = user;
ILibChain_AddWaitHandle(chain, p->hEvent, -1, ILibChain_WriteEx_Sink, state);
return(ILibTransport_DoneState_INCOMPLETE);
}
else
{
// IO Error
return(ILibTransport_DoneState_ERROR);
}
}
else
{
// Write Completed
return(ILibTransport_DoneState_COMPLETE);
}
}
void ILibChain_ReadEx(void *chain, HANDLE h, OVERLAPPED *p, char *buffer, int bufferLen, ILibChain_ReadEx_Handler handler, void *user)
{
DWORD bytesRead = 0;
@@ -3080,26 +3199,30 @@ void ILibChain_AddWaitHandle(void *chain, HANDLE h, int msTIMEOUT, ILibChain_Wai
return;
}
ILibChain_WaitHandleInfo *info = NULL;
if (((ILibBaseChain*)chain)->currentHandle != h)
{
void *node = ILibLinkedList_AddTail(((ILibBaseChain*)chain)->auxSelectHandles, h);
ILibChain_WaitHandleInfo *info = (ILibChain_WaitHandleInfo*)ILibMemory_Extra(node);
info = (ILibChain_WaitHandleInfo*)ILibMemory_Extra(node);
info->node = node;
ILibForceUnBlockChain(chain);
}
else
{
((ILibBaseChain*)chain)->currentHandle = NULL;
info = ((ILibBaseChain*)chain)->currentInfo;
}
if (info != NULL)
{
info->handler = handler;
info->user = user;
info->node = node;
if (msTIMEOUT != INFINITE && msTIMEOUT >= 0)
{
ILibGetTimeOfDay(&(info->expiration));
info->expiration.tv_sec += (long)(msTIMEOUT / 1000);
info->expiration.tv_usec += ((msTIMEOUT % 1000) * 1000);
}
ILibForceUnBlockChain(chain);
}
else
{
// We are trying to add ourselves, so we can optimize by not deleting, intead of adding new
((ILibBaseChain*)chain)->currentHandle = NULL;
if (((ILibBaseChain*)chain)->currentInfo->user != user) { ((ILibBaseChain*)chain)->currentInfo->user = user; }
}
}
void __stdcall ILibChain_RemoveWaitHandle_APC(ULONG_PTR u)
@@ -3114,7 +3237,10 @@ void __stdcall ILibChain_RemoveWaitHandle_APC(ULONG_PTR u)
// We found the HANDLE, so if we remove the HANDLE from the list, and
// set the unblock flag, we'll be good to go
//
if (chain->currentHandle == h) { chain->currentHandle = NULL; chain->currentInfo = NULL; }
if (chain->currentHandle == h)
{
chain->currentHandle = NULL; chain->currentInfo = NULL;
}
ILibLinkedList_Remove(node);
chain->UnblockFlag = 1;
}
@@ -3292,7 +3418,7 @@ ILibExportMethod void ILibStartChain(void *Chain)
int x = 0;
DWORD waitTimeout = 0;
ILibChain_SetupWindowsWaitObject(chain->WaitHandles, &x, &tv, &waitTimeout, &readset, &writeset, &errorset, chain->auxSelectHandles);
ILibChain_SetupWindowsWaitObject(chain->WaitHandles, &x, &tv, &waitTimeout, &readset, &writeset, &errorset, chain->auxSelectHandles, NULL);
slct = ILibChain_WindowsSelect(chain, &readset, &writeset, &errorset, chain->WaitHandles, x, waitTimeout);
#else
slct = select(FD_SETSIZE, &readset, &writeset, &errorset, &tv);

View File

@@ -974,7 +974,9 @@ int ILibIsRunningOnChainThread(void* chain);
void *ILibChain_GetObjectForDescriptor(void *chain, int fd);
char *ILibChain_GetMetaDataFromDescriptorSet(void *chain, fd_set *inr, fd_set *inw, fd_set *ine);
#ifdef WIN32
typedef void(*ILib_GenericReadHandler)(char *buffer, int bufferLen, int* bytesConsumed, void* user1, void *user2);
typedef BOOL(*ILibChain_ReadEx_Handler)(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus status, char *buffer, int bytesRead, void* user);
typedef BOOL(*ILibChain_WriteEx_Handler)(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus status, int bytesWritten, void* user);
typedef struct ILibChain_ReadEx_data
{
char *buffer;
@@ -983,9 +985,20 @@ int ILibIsRunningOnChainThread(void* chain);
OVERLAPPED *p;
void *user;
}ILibChain_ReadEx_data;
typedef struct ILibChain_WriteEx_data
{
ILibChain_WriteEx_Handler handler;
char *buffer;
int bytesLeft;
int totalWritten;
HANDLE fileHandle;
OVERLAPPED *p;
void *user;
}ILibChain_WriteEx_data;
void ILibChain_AddWaitHandle(void *chain, HANDLE h, int msTIMEOUT, ILibChain_WaitHandleHandler handler, void *user);
void ILibChain_RemoveWaitHandle(void *chain, HANDLE h);
void ILibChain_ReadEx(void *chain, HANDLE h, OVERLAPPED *p, char *buffer, int bufferLen, ILibChain_ReadEx_Handler handler, void *user);
ILibTransport_DoneState ILibChain_WriteEx(void *chain, HANDLE h, OVERLAPPED *p, char *buffer, int bufferLen, ILibChain_WriteEx_Handler handler, void *user);
#define tv2LTtv1(ptv1, ptv2) ((ptv2)->tv_sec < (ptv1)->tv_sec || ((ptv2)->tv_sec == (ptv1)->tv_sec && (ptv2)->tv_usec < (ptv1)->tv_usec))
#define tv2LTEtv1(ptv1, ptv2) (tv2LTtv1(ptv2,ptv1) || ((ptv2)->tv_sec == (ptv1)->tv_sec && (ptv2)->tv_usec <= (ptv1)->tv_usec))
#define tvnonzero(ptv) ((ptv)->tv_sec != 0 || (ptv)->tv_usec != 0)
@@ -993,7 +1006,11 @@ int ILibIsRunningOnChainThread(void* chain);
ILibExportMethod void ILibStartChain(void *chain);
ILibExportMethod void ILibStopChain(void *chain);
#ifdef WIN32
ILibExportMethod void ILibChain_Continue(void *chain, ILibChain_Link **modules, int moduleCount, int maxTimeout, HANDLE **handles);
#else
ILibExportMethod void ILibChain_Continue(void *chain, ILibChain_Link **modules, int moduleCount, int maxTimeout);
#endif
ILibExportMethod void ILibChain_EndContinue(void *chain);
ILibChain_ContinuationStates ILibChain_GetContinuationState(void *chain);
#define ILibChain_FreeLink(link) ((ILibChain_Link*)link)->RESERVED = 0xFFFFFFFF;free(link);

View File

@@ -1318,6 +1318,16 @@ void ILibProcessPipe_Process_AddHandlers(ILibProcessPipe_Process module, int buf
#endif
}
}
#ifdef WIN32
void ILibProcessPipe_Process_GetWaitHandles(ILibProcessPipe_Process p, HANDLE *hProcess, HANDLE *read, HANDLE *write, HANDLE *error)
{
ILibProcessPipe_Process_Object* j = (ILibProcessPipe_Process_Object*)p;
*hProcess = j->hProcess;
*read = j->stdOut->mOverlapped->hEvent;
*error = j->stdErr->mOverlapped->hEvent;
*write = j->stdIn->mOverlapped->hEvent;
}
#endif
void ILibProcessPipe_Pipe_Close(ILibProcessPipe_Pipe po)
{
ILibProcessPipe_PipeObject* pipeObject = (ILibProcessPipe_PipeObject*)po;

View File

@@ -86,6 +86,9 @@ void ILibProcessPipe_Process_RemoveHandlers(ILibProcessPipe_Process module);
void ILibProcessPipe_Process_UpdateUserObject(ILibProcessPipe_Process module, void *userObj);
ILibTransport_DoneState ILibProcessPipe_Process_WriteStdIn(ILibProcessPipe_Process p, char* buffer, int bufferLen, ILibTransport_MemoryOwnership ownership);
void ILibProcessPipe_Process_CloseStdIn(ILibProcessPipe_Process p);
#ifdef WIN32
void ILibProcessPipe_Process_GetWaitHandles(ILibProcessPipe_Process p, HANDLE *hProcess, HANDLE *read, HANDLE *write, HANDLE *error);
#endif
void ILibProcessPipe_Pipe_Close(ILibProcessPipe_Pipe po);
void ILibProcessPipe_Pipe_Pause(ILibProcessPipe_Pipe pipeObject);