diff --git a/microscript/ILibDuktape_ReadableStream.c b/microscript/ILibDuktape_ReadableStream.c index bc178c5..f96866f 100644 --- a/microscript/ILibDuktape_ReadableStream.c +++ b/microscript/ILibDuktape_ReadableStream.c @@ -701,6 +701,8 @@ duk_ret_t ILibDuktape_readableStream_pipe(duk_context *ctx) duk_call_method(ctx, 2); duk_pop(ctx); // ... if (rstream->paused != 0) { + rstream->paused = 0; // Set state now, so nobody tries to resume before we can finish piping + // We are paused, so we should yield and resume... We yield, so in case the user tries to chain multiple pipes, it will chain first rstream->resumeImmediate = ILibDuktape_Immediate(ctx, (void*[]) { rstream, duk_get_heapptr(ctx, 0) }, 1, ILibDuktape_ReadableStream_pipe_ResumeLater); duk_push_heapptr(ctx, rstream->resumeImmediate); // [immediate] diff --git a/microscript/ILibDuktape_net.c b/microscript/ILibDuktape_net.c index f1677d3..31b9abe 100644 --- a/microscript/ILibDuktape_net.c +++ b/microscript/ILibDuktape_net.c @@ -94,7 +94,7 @@ typedef struct ILibDuktape_net_WindowsIPC OVERLAPPED overlapped; ILibDuktape_DuplexStream *ds; BOOL clientConnected; - + void *reservedState; ULONG_PTR _reserved[5]; char *buffer; @@ -947,12 +947,22 @@ void ILibDuktape_net_server_IPC_PauseSink(ILibDuktape_DuplexStream *sender, void // No-OP, becuase all we need to so is set Paused flag, which is already the case when we get here ILibDuktape_net_WindowsIPC *winIPC = (ILibDuktape_net_WindowsIPC*)user; winIPC->paused = 1; + + winIPC->reservedState = ILibChain_WaitHandle_RemoveAndSaveState(winIPC->mChain, winIPC->read_overlapped.hEvent); } void ILibDuktape_net_server_IPC_ResumeSink(ILibDuktape_DuplexStream *sender, void *user) { ILibDuktape_net_WindowsIPC *winIPC = (ILibDuktape_net_WindowsIPC*)user; winIPC->paused = 0; - ILibDuktape_server_ipc_ReadSink(winIPC->mChain, winIPC->mPipeHandle, ILibWaitHandle_ErrorStatus_NONE, NULL, 0, winIPC); + if (winIPC->reservedState != NULL) + { + ILibChain_WaitHandle_RestoreState(winIPC->mChain, winIPC->reservedState); + winIPC->reservedState = NULL; + } + else + { + ILibDuktape_server_ipc_ReadSink(winIPC->mChain, winIPC->mPipeHandle, ILibWaitHandle_ErrorStatus_NONE, NULL, 0, winIPC); + } } ILibTransport_DoneState ILibDuktape_net_server_IPC_WriteSink(ILibDuktape_DuplexStream *stream, char *buffer, int bufferLen, void *user) diff --git a/microstack/ILibParsers.c b/microstack/ILibParsers.c index c16e437..c3cb6ae 100644 --- a/microstack/ILibParsers.c +++ b/microstack/ILibParsers.c @@ -3274,7 +3274,55 @@ void ILibChain_ReadEx2(void *chain, HANDLE h, OVERLAPPED *p, char *buffer, int b } +BOOL ILibChain_WaitHandleAdded(void *chain, HANDLE h) +{ + return(ILibLinkedList_GetNode_Search(((ILibBaseChain*)chain)->auxSelectHandles, NULL, h) != NULL); +} +void* ILibChain_WaitHandle_RemoveAndSaveState(void *chain, HANDLE h) +{ + ILibChain_WaitHandleInfo *ret = NULL; + void *node = ILibLinkedList_GetNode_Search(((ILibBaseChain*)chain)->auxSelectHandles, NULL, h); + if (node != NULL) + { + ret = (ILibChain_WaitHandleInfo*)ILibMemory_SmartAllocate(sizeof(ILibChain_WaitHandleInfo)); + memcpy_s(ret, ILibMemory_Size(ret), ILibMemory_Extra(node), ILibMemory_Size(ret)); + ret->node = h; + ILibLinkedList_Remove(node); + if (((ILibBaseChain*)chain)->currentHandle == h) + { + ((ILibBaseChain*)chain)->currentHandle = NULL; ((ILibBaseChain*)chain)->currentInfo = NULL; + } + ((ILibBaseChain*)chain)->UnblockFlag = 1; + } + return(ret); +} +void ILibChain_WaitHandle_RestoreState(void *chain, void *state) +{ + ILibChain_WaitHandleInfo *info = (ILibChain_WaitHandleInfo*)state; + int msTIMEOUT = -1; + struct timeval current; + + if (tvnonzero(&(info->expiration))) + { + // Expiration was specified + ILibGetTimeOfDay(¤t); + + if (tv2LTEtv1(&(info->expiration), ¤t)) + { + // Expiration happened in the past + msTIMEOUT = 1; + } + else + { + // Expiration is in the future + msTIMEOUT = ILibGetMillisecondTimeSpan(&(info->expiration), ¤t); + } + } + + ILibChain_AddWaitHandleEx(chain, info->node, msTIMEOUT, info->handler, info->user, info->metadata); + ILibMemory_Free(info); +} void __stdcall ILibChain_AddWaitHandle_apc(ULONG_PTR u) { void *chain = ((void**)u)[0]; @@ -8782,6 +8830,42 @@ void ILibGetDiskFreeSpace(void *i64FreeBytesToCaller, void *i64TotalBytes) *((uint64_t *)i64TotalBytes)= (uint64_t)stfs.f_blocks * stfs.f_bsize; #endif } + +int ILibGetMillisecondTimeSpan(struct timeval *tv1, struct timeval *tv2) +{ + struct timeval a; + struct timeval b; + int seconds; + + if (tv2LTtv1(tv1, tv2)) + { + memcpy_s(&a, sizeof(a), tv2, sizeof(a)); + memcpy_s(&b, sizeof(b), tv1, sizeof(b)); + } + else + { + memcpy_s(&b, sizeof(b), tv2, sizeof(b)); + memcpy_s(&a, sizeof(a), tv1, sizeof(a)); + } + + if (b.tv_usec < a.tv_usec) + { + seconds = (a.tv_usec - b.tv_usec) / 1000000 + 1; + a.tv_usec -= (1000000 * seconds); + a.tv_sec += seconds; + } + if (b.tv_usec - a.tv_usec > 1000000) + { + seconds = (b.tv_usec - a.tv_usec) / 1000000; + a.tv_usec += (1000000 * seconds); + a.tv_sec -= seconds; + } + + // Millisecond Span + return(((b.tv_sec - a.tv_sec) * 1000) + ((b.tv_usec - a.tv_usec) / 1000)); +} + + long ILibGetTimeStamp() { struct timeval tv; diff --git a/microstack/ILibParsers.h b/microstack/ILibParsers.h index b7dd92c..02f4316 100644 --- a/microstack/ILibParsers.h +++ b/microstack/ILibParsers.h @@ -999,6 +999,10 @@ int ILibIsRunningOnChainThread(void* chain); }ILibChain_WriteEx_data; char *ILibChain_MetaData(char *file, int number); + int ILibGetMillisecondTimeSpan(struct timeval *tv1, struct timeval *tv2); + void* ILibChain_WaitHandle_RemoveAndSaveState(void *chain, HANDLE h); + void ILibChain_WaitHandle_RestoreState(void *chain, void *state); + BOOL ILibChain_WaitHandleAdded(void *chain, HANDLE h); void ILibChain_AddWaitHandleEx(void *chain, HANDLE h, int msTIMEOUT, ILibChain_WaitHandleHandler handler, void *user, char *metadata); #define ILibChain_AddWaitHandle(chain, h, msTIMEOUT, handler, user) ILibChain_AddWaitHandleEx(chain, h, msTIMEOUT, handler, user, ILibChain_MetaData(__FILE__, __LINE__)) void ILibChain_RemoveWaitHandle(void *chain, HANDLE h);