1
0
mirror of https://github.com/Ylianst/MeshAgent synced 2025-12-06 00:13:33 +00:00

Updated windows path to use ReadEx when overlapped is supported

This commit is contained in:
Bryan Roe
2020-05-09 11:02:16 -07:00
parent b2b78a3dbe
commit a4c5430879
3 changed files with 163 additions and 30 deletions

View File

@@ -921,6 +921,15 @@ struct ILibBaseChain_SafeData
void *Object;
};
#ifdef WIN32
typedef struct ILibChain_WaitHandleInfo
{
void *node;
ILibChain_WaitHandleHandler handler;
void *user;
struct timeval expiration;
}ILibChain_WaitHandleInfo;
#endif
typedef struct ILibBaseChain
{
@@ -949,6 +958,7 @@ typedef struct ILibBaseChain
void* auxSelectHandles;
HANDLE WaitHandles[FD_SETSIZE * 2];
HANDLE currentHandle;
ILibChain_WaitHandleInfo *currentInfo;
#else
pthread_t ChainThreadID;
int TerminatePipe[2];
@@ -974,15 +984,6 @@ typedef struct ILibBaseChain
void *node;
}ILibBaseChain;
#ifdef WIN32
typedef struct ILibChain_WaitHandleInfo
{
void *node;
ILibChain_WaitHandleHandler handler;
void *user;
struct timeval expiration;
}ILibChain_WaitHandleInfo;
#endif
void* ILibMemory_AllocateA_Init(void *buffer)
{
@@ -2026,6 +2027,7 @@ int ILibChain_WindowsSelect(void *chain, fd_set *readset, fd_set *writeset, fd_s
{
ILibChain_WaitHandleInfo *info = (ILibChain_WaitHandleInfo*)waitList[ILibChain_HandleInfoIndex(slct)];
((ILibBaseChain*)chain)->currentHandle = waitList[slct];
((ILibBaseChain*)chain)->currentInfo = info;
waitList[ILibChain_HandleInfoIndex(slct)] = NULL;
waitList[slct] = NULL;
if (info->handler != NULL)
@@ -2040,6 +2042,7 @@ int ILibChain_WindowsSelect(void *chain, fd_set *readset, fd_set *writeset, fd_s
}
}
((ILibBaseChain*)chain)->currentHandle = NULL;
((ILibBaseChain*)chain)->currentInfo = NULL;
}
}
if (slct == WAIT_TIMEOUT)
@@ -2988,6 +2991,69 @@ void *ILibChain_GetObjectForDescriptor(void *chain, int fd)
}
#ifdef WIN32
BOOL ILibChain_ReadEx_Sink(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus status, void *user)
{
ILibChain_ReadEx_data *data = (ILibChain_ReadEx_data*)user;
DWORD bytesRead = 0;
if (GetOverlappedResult(data->fileHandle, data->p, &bytesRead, FALSE) && bytesRead > 0)
{
if (data->handler != NULL) { data->handler(chain, data->fileHandle, ILibWaitHandle_ErrorStatus_NONE, data->buffer, bytesRead, data->user); }
ILibMemory_Free(data);
return(FALSE);
}
else
{
if (GetLastError() == ERROR_IO_PENDING)
{
// Still pending, so wait for another callback
return(TRUE);
}
else
{
// ERROR
if (data->handler != NULL) { data->handler(chain, data->fileHandle, ILibWaitHandle_ErrorStatus_IO_ERROR, data->buffer, 0, data->user); }
ILibMemory_Free(data);
return(FALSE);
}
}
}
void ILibChain_ReadEx(void *chain, HANDLE h, OVERLAPPED *p, char *buffer, int bufferLen, ILibChain_ReadEx_Handler handler, void *user)
{
DWORD bytesRead = 0;
int e = 0;
if (!ReadFile(h, buffer, bufferLen, &bytesRead, p))
{
if ((e = GetLastError()) == ERROR_IO_PENDING)
{
ILibChain_ReadEx_data *state = (ILibChain_ReadEx_data*)ILibMemory_SmartAllocate(sizeof(ILibChain_ReadEx_data));
state->buffer = buffer;
state->p = p;
state->handler = handler;
state->fileHandle = h;
state->user = user;
ILibChain_AddWaitHandle(chain, p->hEvent, -1, ILibChain_ReadEx_Sink, state);
}
else
{
if (handler != NULL) { handler(chain, h, ILibWaitHandle_ErrorStatus_IO_ERROR, buffer, 0, user); }
}
}
else
{
if (bytesRead > 0)
{
if (handler != NULL) { handler(chain, h, ILibWaitHandle_ErrorStatus_NONE, buffer, bytesRead, user); }
}
else
{
if (handler != NULL) { handler(chain, h, ILibWaitHandle_ErrorStatus_IO_ERROR, buffer, 0, user); }
}
}
}
void __stdcall ILibChain_AddWaitHandle_apc(ULONG_PTR u)
{
void *chain = ((void**)u)[0];
@@ -3014,20 +3080,27 @@ void ILibChain_AddWaitHandle(void *chain, HANDLE h, int msTIMEOUT, ILibChain_Wai
return;
}
void *node = ILibLinkedList_AddTail(((ILibBaseChain*)chain)->auxSelectHandles, h);
ILibChain_WaitHandleInfo *info = (ILibChain_WaitHandleInfo*)ILibMemory_Extra(node);
info->handler = handler;
info->user = user;
info->node = node;
if (msTIMEOUT != INFINITE && msTIMEOUT >= 0)
if (((ILibBaseChain*)chain)->currentHandle != h)
{
ILibGetTimeOfDay(&(info->expiration));
info->expiration.tv_sec += (long)(msTIMEOUT / 1000);
info->expiration.tv_usec += ((msTIMEOUT % 1000) * 1000);
void *node = ILibLinkedList_AddTail(((ILibBaseChain*)chain)->auxSelectHandles, h);
ILibChain_WaitHandleInfo *info = (ILibChain_WaitHandleInfo*)ILibMemory_Extra(node);
info->handler = handler;
info->user = user;
info->node = node;
if (msTIMEOUT != INFINITE && msTIMEOUT >= 0)
{
ILibGetTimeOfDay(&(info->expiration));
info->expiration.tv_sec += (long)(msTIMEOUT / 1000);
info->expiration.tv_usec += ((msTIMEOUT % 1000) * 1000);
}
ILibForceUnBlockChain(chain);
}
else
{
// We are trying to add ourselves, so we can optimize by not deleting, intead of adding new
((ILibBaseChain*)chain)->currentHandle = NULL;
if (((ILibBaseChain*)chain)->currentInfo->user != user) { ((ILibBaseChain*)chain)->currentInfo->user = user; }
}
ILibForceUnBlockChain(chain);
}
void __stdcall ILibChain_RemoveWaitHandle_APC(ULONG_PTR u)
{
@@ -3041,7 +3114,7 @@ void __stdcall ILibChain_RemoveWaitHandle_APC(ULONG_PTR u)
// We found the HANDLE, so if we remove the HANDLE from the list, and
// set the unblock flag, we'll be good to go
//
if (chain->currentHandle == h) { chain->currentHandle = NULL; }
if (chain->currentHandle == h) { chain->currentHandle = NULL; chain->currentInfo = NULL; }
ILibLinkedList_Remove(node);
chain->UnblockFlag = 1;
}

View File

@@ -354,7 +354,8 @@ int ILibIsRunningOnChainThread(void* chain);
ILibWaitHandle_ErrorStatus_INVALID_HANDLE = 1,
ILibWaitHandle_ErrorStatus_TIMEOUT = 2,
ILibWaitHandle_ErrorStatus_REMOVED = 3,
ILibWaitHandle_ErrorStatus_MANAGER_EXITING = 4
ILibWaitHandle_ErrorStatus_MANAGER_EXITING = 4,
ILibWaitHandle_ErrorStatus_IO_ERROR = 5
}ILibWaitHandle_ErrorStatus;
typedef BOOL(*ILibChain_WaitHandleHandler)(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus, void* user);
#endif
@@ -973,8 +974,18 @@ int ILibIsRunningOnChainThread(void* chain);
void *ILibChain_GetObjectForDescriptor(void *chain, int fd);
char *ILibChain_GetMetaDataFromDescriptorSet(void *chain, fd_set *inr, fd_set *inw, fd_set *ine);
#ifdef WIN32
typedef BOOL(*ILibChain_ReadEx_Handler)(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus status, char *buffer, int bytesRead, void* user);
typedef struct ILibChain_ReadEx_data
{
char *buffer;
ILibChain_ReadEx_Handler handler;
HANDLE fileHandle;
OVERLAPPED *p;
void *user;
}ILibChain_ReadEx_data;
void ILibChain_AddWaitHandle(void *chain, HANDLE h, int msTIMEOUT, ILibChain_WaitHandleHandler handler, void *user);
void ILibChain_RemoveWaitHandle(void *chain, HANDLE h);
void ILibChain_ReadEx(void *chain, HANDLE h, OVERLAPPED *p, char *buffer, int bufferLen, ILibChain_ReadEx_Handler handler, void *user);
#define tv2LTtv1(ptv1, ptv2) ((ptv2)->tv_sec < (ptv1)->tv_sec || ((ptv2)->tv_sec == (ptv1)->tv_sec && (ptv2)->tv_usec < (ptv1)->tv_usec))
#define tv2LTEtv1(ptv1, ptv2) (tv2LTtv1(ptv2,ptv1) || ((ptv2)->tv_sec == (ptv1)->tv_sec && (ptv2)->tv_usec <= (ptv1)->tv_usec))
#define tvnonzero(ptv) ((ptv)->tv_sec != 0 || (ptv)->tv_usec != 0)

View File

@@ -1001,7 +1001,7 @@ void ILibProcessPipe_Pipe_Pause(ILibProcessPipe_Pipe pipeObject)
else
{
ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.Pause(): Opaque = %p",(void*)p->mOverlapped_opaqueData);
ILibChain_RemoveWaitHandle(p->manager->ChainLink.ParentChain, p->mOverlapped->hEvent);
//ILibChain_RemoveWaitHandle(p->manager->ChainLink.ParentChain, p->mOverlapped->hEvent);
}
#else
ILibLinkedList_Remove(ILibLinkedList_GetNode_Search(p->manager->ActivePipes, NULL, pipeObject));
@@ -1072,6 +1072,9 @@ void ILibProcessPipe_Pipe_ResumeEx(ILibProcessPipe_PipeObject* p)
ILibLifeTime_Add(ILibGetBaseTimer(p->manager->ChainLink.ParentChain), p, 0, &ILibProcessPipe_Process_StartPipeReaderWriterEx, NULL); // Need to context switch to Chain Thread
}
}
#ifdef WIN32
BOOL ILibProcessPipe_Process_Pipe_ReadExHandler(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus status, char *buffer, int bytesRead, void* user);
#endif
void ILibProcessPipe_Pipe_Resume(ILibProcessPipe_Pipe pipeObject)
{
ILibProcessPipe_PipeObject *p = (ILibProcessPipe_PipeObject*)pipeObject;
@@ -1083,7 +1086,9 @@ void ILibProcessPipe_Pipe_Resume(ILibProcessPipe_Pipe pipeObject)
}
else
{
ILibProcessPipe_Pipe_ResumeEx(p);
//ILibProcessPipe_Pipe_ResumeEx(p);
p->PAUSED = 0;
ILibProcessPipe_Process_Pipe_ReadExHandler(p->manager->ChainLink.ParentChain, p->mPipe_ReadEnd, ILibWaitHandle_ErrorStatus_NONE, NULL, 0, pipeObject);
if (p->mProcess != NULL && p->mProcess->hProcess_needAdd != 0 && p->mProcess->disabled == 0)
{
p->mProcess->hProcess_needAdd = 0;
@@ -1151,7 +1156,49 @@ DWORD ILibProcessPipe_Pipe_BackgroundReader(void *arg)
return 0;
}
#endif
#ifdef WIN32
BOOL ILibProcessPipe_Process_Pipe_ReadExHandler(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus status, char *buffer, int bytesRead, void* user)
{
ILibProcessPipe_PipeObject *pipeObject = (ILibProcessPipe_PipeObject*)user;
ILibProcessPipe_GenericReadHandler handler = (ILibProcessPipe_GenericReadHandler)pipeObject->handler;
int consumed = 0;
if (status == ILibWaitHandle_ErrorStatus_NONE)
{
pipeObject->totalRead += bytesRead;
do
{
handler(pipeObject->buffer + pipeObject->readOffset, pipeObject->totalRead, &consumed, pipeObject->user1, pipeObject->user2);
pipeObject->readOffset += consumed;
pipeObject->totalRead -= consumed;
} while (pipeObject->PAUSED == 0 && consumed != 0 && pipeObject->totalRead > 0);
if (pipeObject->totalRead == 0) { pipeObject->readOffset = 0; }
if (pipeObject->PAUSED == 0)
{
if (pipeObject->readOffset > 0)
{
memmove_s(pipeObject->buffer, pipeObject->bufferSize, pipeObject->buffer + pipeObject->readOffset, pipeObject->totalRead);
pipeObject->readOffset = 0;
}
else if (pipeObject->totalRead == pipeObject->bufferSize)
{
ILibMemory_ReallocateRaw(&(pipeObject->buffer), pipeObject->bufferSize * 2);
pipeObject->bufferSize = pipeObject->bufferSize * 2;
}
ILibChain_ReadEx(chain, h, pipeObject->mOverlapped, pipeObject->buffer + pipeObject->readOffset + pipeObject->totalRead, pipeObject->bufferSize - pipeObject->totalRead, ILibProcessPipe_Process_Pipe_ReadExHandler, pipeObject);
return(TRUE);
}
else
{
return(FALSE);
}
}
else
{
// I/O Errors
return(FALSE);
}
}
#endif
void ILibProcessPipe_Process_StartPipeReader(ILibProcessPipe_PipeObject *pipeObject, int bufferSize, ILibProcessPipe_GenericReadHandler handler, void* user1, void* user2)
{
#ifdef WIN32
@@ -1168,10 +1215,12 @@ void ILibProcessPipe_Process_StartPipeReader(ILibProcessPipe_PipeObject *pipeObj
if (pipeObject->mOverlapped != NULL)
{
// This PIPE supports Overlapped I/O
//printf("ReadFile(%p, %d, %d) (StartPipeReader)\n", pipeObject->mPipe_ReadEnd, 0, pipeObject->bufferSize);
pipeObject->inProgress = 1;
result = ReadFile(pipeObject->mPipe_ReadEnd, pipeObject->buffer, pipeObject->bufferSize, NULL, pipeObject->mOverlapped);
ILibChain_AddWaitHandle(pipeObject->manager->ChainLink.ParentChain, pipeObject->mOverlapped->hEvent, -1, ILibProcessPipe_Process_ReadHandler, pipeObject);
ILibChain_ReadEx(pipeObject->manager->ChainLink.ParentChain, pipeObject->mPipe_ReadEnd, pipeObject->mOverlapped, pipeObject->buffer, pipeObject->bufferSize, ILibProcessPipe_Process_Pipe_ReadExHandler, pipeObject);
////printf("ReadFile(%p, %d, %d) (StartPipeReader)\n", pipeObject->mPipe_ReadEnd, 0, pipeObject->bufferSize);
//pipeObject->inProgress = 1;
//result = ReadFile(pipeObject->mPipe_ReadEnd, pipeObject->buffer, pipeObject->bufferSize, NULL, pipeObject->mOverlapped);
//ILibChain_AddWaitHandle(pipeObject->manager->ChainLink.ParentChain, pipeObject->mOverlapped->hEvent, -1, ILibProcessPipe_Process_ReadHandler, pipeObject);
}
else
{