diff --git a/microstack/ILibParsers.c b/microstack/ILibParsers.c index 25def95..9c14ed0 100644 --- a/microstack/ILibParsers.c +++ b/microstack/ILibParsers.c @@ -921,6 +921,15 @@ struct ILibBaseChain_SafeData void *Object; }; +#ifdef WIN32 +typedef struct ILibChain_WaitHandleInfo +{ + void *node; + ILibChain_WaitHandleHandler handler; + void *user; + struct timeval expiration; +}ILibChain_WaitHandleInfo; +#endif typedef struct ILibBaseChain { @@ -949,6 +958,7 @@ typedef struct ILibBaseChain void* auxSelectHandles; HANDLE WaitHandles[FD_SETSIZE * 2]; HANDLE currentHandle; + ILibChain_WaitHandleInfo *currentInfo; #else pthread_t ChainThreadID; int TerminatePipe[2]; @@ -974,15 +984,6 @@ typedef struct ILibBaseChain void *node; }ILibBaseChain; -#ifdef WIN32 -typedef struct ILibChain_WaitHandleInfo -{ - void *node; - ILibChain_WaitHandleHandler handler; - void *user; - struct timeval expiration; -}ILibChain_WaitHandleInfo; -#endif void* ILibMemory_AllocateA_Init(void *buffer) { @@ -2026,6 +2027,7 @@ int ILibChain_WindowsSelect(void *chain, fd_set *readset, fd_set *writeset, fd_s { ILibChain_WaitHandleInfo *info = (ILibChain_WaitHandleInfo*)waitList[ILibChain_HandleInfoIndex(slct)]; ((ILibBaseChain*)chain)->currentHandle = waitList[slct]; + ((ILibBaseChain*)chain)->currentInfo = info; waitList[ILibChain_HandleInfoIndex(slct)] = NULL; waitList[slct] = NULL; if (info->handler != NULL) @@ -2040,6 +2042,7 @@ int ILibChain_WindowsSelect(void *chain, fd_set *readset, fd_set *writeset, fd_s } } ((ILibBaseChain*)chain)->currentHandle = NULL; + ((ILibBaseChain*)chain)->currentInfo = NULL; } } if (slct == WAIT_TIMEOUT) @@ -2988,6 +2991,69 @@ void *ILibChain_GetObjectForDescriptor(void *chain, int fd) } #ifdef WIN32 +BOOL ILibChain_ReadEx_Sink(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus status, void *user) +{ + ILibChain_ReadEx_data *data = (ILibChain_ReadEx_data*)user; + DWORD bytesRead = 0; + + if (GetOverlappedResult(data->fileHandle, data->p, &bytesRead, FALSE) && bytesRead > 0) + { + if (data->handler != NULL) { data->handler(chain, data->fileHandle, ILibWaitHandle_ErrorStatus_NONE, data->buffer, bytesRead, data->user); } + ILibMemory_Free(data); + return(FALSE); + } + else + { + if (GetLastError() == ERROR_IO_PENDING) + { + // Still pending, so wait for another callback + return(TRUE); + } + else + { + // ERROR + if (data->handler != NULL) { data->handler(chain, data->fileHandle, ILibWaitHandle_ErrorStatus_IO_ERROR, data->buffer, 0, data->user); } + ILibMemory_Free(data); + return(FALSE); + } + } +} +void ILibChain_ReadEx(void *chain, HANDLE h, OVERLAPPED *p, char *buffer, int bufferLen, ILibChain_ReadEx_Handler handler, void *user) +{ + DWORD bytesRead = 0; + int e = 0; + if (!ReadFile(h, buffer, bufferLen, &bytesRead, p)) + { + if ((e = GetLastError()) == ERROR_IO_PENDING) + { + ILibChain_ReadEx_data *state = (ILibChain_ReadEx_data*)ILibMemory_SmartAllocate(sizeof(ILibChain_ReadEx_data)); + state->buffer = buffer; + state->p = p; + state->handler = handler; + state->fileHandle = h; + state->user = user; + ILibChain_AddWaitHandle(chain, p->hEvent, -1, ILibChain_ReadEx_Sink, state); + } + else + { + if (handler != NULL) { handler(chain, h, ILibWaitHandle_ErrorStatus_IO_ERROR, buffer, 0, user); } + } + } + else + { + if (bytesRead > 0) + { + if (handler != NULL) { handler(chain, h, ILibWaitHandle_ErrorStatus_NONE, buffer, bytesRead, user); } + } + else + { + if (handler != NULL) { handler(chain, h, ILibWaitHandle_ErrorStatus_IO_ERROR, buffer, 0, user); } + } + } +} + + + void __stdcall ILibChain_AddWaitHandle_apc(ULONG_PTR u) { void *chain = ((void**)u)[0]; @@ -3014,20 +3080,27 @@ void ILibChain_AddWaitHandle(void *chain, HANDLE h, int msTIMEOUT, ILibChain_Wai return; } - - void *node = ILibLinkedList_AddTail(((ILibBaseChain*)chain)->auxSelectHandles, h); - ILibChain_WaitHandleInfo *info = (ILibChain_WaitHandleInfo*)ILibMemory_Extra(node); - info->handler = handler; - info->user = user; - info->node = node; - if (msTIMEOUT != INFINITE && msTIMEOUT >= 0) + if (((ILibBaseChain*)chain)->currentHandle != h) { - ILibGetTimeOfDay(&(info->expiration)); - info->expiration.tv_sec += (long)(msTIMEOUT / 1000); - info->expiration.tv_usec += ((msTIMEOUT % 1000) * 1000); + void *node = ILibLinkedList_AddTail(((ILibBaseChain*)chain)->auxSelectHandles, h); + ILibChain_WaitHandleInfo *info = (ILibChain_WaitHandleInfo*)ILibMemory_Extra(node); + info->handler = handler; + info->user = user; + info->node = node; + if (msTIMEOUT != INFINITE && msTIMEOUT >= 0) + { + ILibGetTimeOfDay(&(info->expiration)); + info->expiration.tv_sec += (long)(msTIMEOUT / 1000); + info->expiration.tv_usec += ((msTIMEOUT % 1000) * 1000); + } + ILibForceUnBlockChain(chain); + } + else + { + // We are trying to add ourselves, so we can optimize by not deleting, intead of adding new + ((ILibBaseChain*)chain)->currentHandle = NULL; + if (((ILibBaseChain*)chain)->currentInfo->user != user) { ((ILibBaseChain*)chain)->currentInfo->user = user; } } - - ILibForceUnBlockChain(chain); } void __stdcall ILibChain_RemoveWaitHandle_APC(ULONG_PTR u) { @@ -3041,7 +3114,7 @@ void __stdcall ILibChain_RemoveWaitHandle_APC(ULONG_PTR u) // We found the HANDLE, so if we remove the HANDLE from the list, and // set the unblock flag, we'll be good to go // - if (chain->currentHandle == h) { chain->currentHandle = NULL; } + if (chain->currentHandle == h) { chain->currentHandle = NULL; chain->currentInfo = NULL; } ILibLinkedList_Remove(node); chain->UnblockFlag = 1; } diff --git a/microstack/ILibParsers.h b/microstack/ILibParsers.h index f8da004..2a34273 100644 --- a/microstack/ILibParsers.h +++ b/microstack/ILibParsers.h @@ -354,7 +354,8 @@ int ILibIsRunningOnChainThread(void* chain); ILibWaitHandle_ErrorStatus_INVALID_HANDLE = 1, ILibWaitHandle_ErrorStatus_TIMEOUT = 2, ILibWaitHandle_ErrorStatus_REMOVED = 3, - ILibWaitHandle_ErrorStatus_MANAGER_EXITING = 4 + ILibWaitHandle_ErrorStatus_MANAGER_EXITING = 4, + ILibWaitHandle_ErrorStatus_IO_ERROR = 5 }ILibWaitHandle_ErrorStatus; typedef BOOL(*ILibChain_WaitHandleHandler)(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus, void* user); #endif @@ -973,8 +974,18 @@ int ILibIsRunningOnChainThread(void* chain); void *ILibChain_GetObjectForDescriptor(void *chain, int fd); char *ILibChain_GetMetaDataFromDescriptorSet(void *chain, fd_set *inr, fd_set *inw, fd_set *ine); #ifdef WIN32 + typedef BOOL(*ILibChain_ReadEx_Handler)(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus status, char *buffer, int bytesRead, void* user); + typedef struct ILibChain_ReadEx_data + { + char *buffer; + ILibChain_ReadEx_Handler handler; + HANDLE fileHandle; + OVERLAPPED *p; + void *user; + }ILibChain_ReadEx_data; void ILibChain_AddWaitHandle(void *chain, HANDLE h, int msTIMEOUT, ILibChain_WaitHandleHandler handler, void *user); void ILibChain_RemoveWaitHandle(void *chain, HANDLE h); + void ILibChain_ReadEx(void *chain, HANDLE h, OVERLAPPED *p, char *buffer, int bufferLen, ILibChain_ReadEx_Handler handler, void *user); #define tv2LTtv1(ptv1, ptv2) ((ptv2)->tv_sec < (ptv1)->tv_sec || ((ptv2)->tv_sec == (ptv1)->tv_sec && (ptv2)->tv_usec < (ptv1)->tv_usec)) #define tv2LTEtv1(ptv1, ptv2) (tv2LTtv1(ptv2,ptv1) || ((ptv2)->tv_sec == (ptv1)->tv_sec && (ptv2)->tv_usec <= (ptv1)->tv_usec)) #define tvnonzero(ptv) ((ptv)->tv_sec != 0 || (ptv)->tv_usec != 0) diff --git a/microstack/ILibProcessPipe.c b/microstack/ILibProcessPipe.c index 52db915..e5f2e04 100644 --- a/microstack/ILibProcessPipe.c +++ b/microstack/ILibProcessPipe.c @@ -1001,7 +1001,7 @@ void ILibProcessPipe_Pipe_Pause(ILibProcessPipe_Pipe pipeObject) else { ILibRemoteLogging_printf(ILibChainGetLogger(p->manager->ChainLink.ParentChain), ILibRemoteLogging_Modules_Microstack_Generic, ILibRemoteLogging_Flags_VerbosityLevel_1, "ProcessPipe.Pause(): Opaque = %p",(void*)p->mOverlapped_opaqueData); - ILibChain_RemoveWaitHandle(p->manager->ChainLink.ParentChain, p->mOverlapped->hEvent); + //ILibChain_RemoveWaitHandle(p->manager->ChainLink.ParentChain, p->mOverlapped->hEvent); } #else ILibLinkedList_Remove(ILibLinkedList_GetNode_Search(p->manager->ActivePipes, NULL, pipeObject)); @@ -1072,6 +1072,9 @@ void ILibProcessPipe_Pipe_ResumeEx(ILibProcessPipe_PipeObject* p) ILibLifeTime_Add(ILibGetBaseTimer(p->manager->ChainLink.ParentChain), p, 0, &ILibProcessPipe_Process_StartPipeReaderWriterEx, NULL); // Need to context switch to Chain Thread } } +#ifdef WIN32 +BOOL ILibProcessPipe_Process_Pipe_ReadExHandler(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus status, char *buffer, int bytesRead, void* user); +#endif void ILibProcessPipe_Pipe_Resume(ILibProcessPipe_Pipe pipeObject) { ILibProcessPipe_PipeObject *p = (ILibProcessPipe_PipeObject*)pipeObject; @@ -1083,7 +1086,9 @@ void ILibProcessPipe_Pipe_Resume(ILibProcessPipe_Pipe pipeObject) } else { - ILibProcessPipe_Pipe_ResumeEx(p); + //ILibProcessPipe_Pipe_ResumeEx(p); + p->PAUSED = 0; + ILibProcessPipe_Process_Pipe_ReadExHandler(p->manager->ChainLink.ParentChain, p->mPipe_ReadEnd, ILibWaitHandle_ErrorStatus_NONE, NULL, 0, pipeObject); if (p->mProcess != NULL && p->mProcess->hProcess_needAdd != 0 && p->mProcess->disabled == 0) { p->mProcess->hProcess_needAdd = 0; @@ -1151,7 +1156,49 @@ DWORD ILibProcessPipe_Pipe_BackgroundReader(void *arg) return 0; } #endif - +#ifdef WIN32 +BOOL ILibProcessPipe_Process_Pipe_ReadExHandler(void *chain, HANDLE h, ILibWaitHandle_ErrorStatus status, char *buffer, int bytesRead, void* user) +{ + ILibProcessPipe_PipeObject *pipeObject = (ILibProcessPipe_PipeObject*)user; + ILibProcessPipe_GenericReadHandler handler = (ILibProcessPipe_GenericReadHandler)pipeObject->handler; + int consumed = 0; + if (status == ILibWaitHandle_ErrorStatus_NONE) + { + pipeObject->totalRead += bytesRead; + do + { + handler(pipeObject->buffer + pipeObject->readOffset, pipeObject->totalRead, &consumed, pipeObject->user1, pipeObject->user2); + pipeObject->readOffset += consumed; + pipeObject->totalRead -= consumed; + } while (pipeObject->PAUSED == 0 && consumed != 0 && pipeObject->totalRead > 0); + if (pipeObject->totalRead == 0) { pipeObject->readOffset = 0; } + if (pipeObject->PAUSED == 0) + { + if (pipeObject->readOffset > 0) + { + memmove_s(pipeObject->buffer, pipeObject->bufferSize, pipeObject->buffer + pipeObject->readOffset, pipeObject->totalRead); + pipeObject->readOffset = 0; + } + else if (pipeObject->totalRead == pipeObject->bufferSize) + { + ILibMemory_ReallocateRaw(&(pipeObject->buffer), pipeObject->bufferSize * 2); + pipeObject->bufferSize = pipeObject->bufferSize * 2; + } + ILibChain_ReadEx(chain, h, pipeObject->mOverlapped, pipeObject->buffer + pipeObject->readOffset + pipeObject->totalRead, pipeObject->bufferSize - pipeObject->totalRead, ILibProcessPipe_Process_Pipe_ReadExHandler, pipeObject); + return(TRUE); + } + else + { + return(FALSE); + } + } + else + { + // I/O Errors + return(FALSE); + } +} +#endif void ILibProcessPipe_Process_StartPipeReader(ILibProcessPipe_PipeObject *pipeObject, int bufferSize, ILibProcessPipe_GenericReadHandler handler, void* user1, void* user2) { #ifdef WIN32 @@ -1168,10 +1215,12 @@ void ILibProcessPipe_Process_StartPipeReader(ILibProcessPipe_PipeObject *pipeObj if (pipeObject->mOverlapped != NULL) { // This PIPE supports Overlapped I/O - //printf("ReadFile(%p, %d, %d) (StartPipeReader)\n", pipeObject->mPipe_ReadEnd, 0, pipeObject->bufferSize); - pipeObject->inProgress = 1; - result = ReadFile(pipeObject->mPipe_ReadEnd, pipeObject->buffer, pipeObject->bufferSize, NULL, pipeObject->mOverlapped); - ILibChain_AddWaitHandle(pipeObject->manager->ChainLink.ParentChain, pipeObject->mOverlapped->hEvent, -1, ILibProcessPipe_Process_ReadHandler, pipeObject); + ILibChain_ReadEx(pipeObject->manager->ChainLink.ParentChain, pipeObject->mPipe_ReadEnd, pipeObject->mOverlapped, pipeObject->buffer, pipeObject->bufferSize, ILibProcessPipe_Process_Pipe_ReadExHandler, pipeObject); + + ////printf("ReadFile(%p, %d, %d) (StartPipeReader)\n", pipeObject->mPipe_ReadEnd, 0, pipeObject->bufferSize); + //pipeObject->inProgress = 1; + //result = ReadFile(pipeObject->mPipe_ReadEnd, pipeObject->buffer, pipeObject->bufferSize, NULL, pipeObject->mOverlapped); + //ILibChain_AddWaitHandle(pipeObject->manager->ChainLink.ParentChain, pipeObject->mOverlapped->hEvent, -1, ILibProcessPipe_Process_ReadHandler, pipeObject); } else {