1
0
mirror of https://github.com/Ylianst/MeshAgent synced 2025-12-11 05:43:33 +00:00

Added ability to Save/Restore handle wait state

Fixed edge case that could cause a connection to resume twice
This commit is contained in:
Bryan Roe
2020-05-20 12:05:38 -07:00
parent f2d63ea116
commit 16a2f437c3
4 changed files with 102 additions and 2 deletions

View File

@@ -701,6 +701,8 @@ duk_ret_t ILibDuktape_readableStream_pipe(duk_context *ctx)
duk_call_method(ctx, 2); duk_pop(ctx); // ...
if (rstream->paused != 0)
{
rstream->paused = 0; // Set state now, so nobody tries to resume before we can finish piping
// We are paused, so we should yield and resume... We yield, so in case the user tries to chain multiple pipes, it will chain first
rstream->resumeImmediate = ILibDuktape_Immediate(ctx, (void*[]) { rstream, duk_get_heapptr(ctx, 0) }, 1, ILibDuktape_ReadableStream_pipe_ResumeLater);
duk_push_heapptr(ctx, rstream->resumeImmediate); // [immediate]

View File

@@ -94,7 +94,7 @@ typedef struct ILibDuktape_net_WindowsIPC
OVERLAPPED overlapped;
ILibDuktape_DuplexStream *ds;
BOOL clientConnected;
void *reservedState;
ULONG_PTR _reserved[5];
char *buffer;
@@ -947,13 +947,23 @@ void ILibDuktape_net_server_IPC_PauseSink(ILibDuktape_DuplexStream *sender, void
// No-OP, becuase all we need to so is set Paused flag, which is already the case when we get here
ILibDuktape_net_WindowsIPC *winIPC = (ILibDuktape_net_WindowsIPC*)user;
winIPC->paused = 1;
winIPC->reservedState = ILibChain_WaitHandle_RemoveAndSaveState(winIPC->mChain, winIPC->read_overlapped.hEvent);
}
void ILibDuktape_net_server_IPC_ResumeSink(ILibDuktape_DuplexStream *sender, void *user)
{
ILibDuktape_net_WindowsIPC *winIPC = (ILibDuktape_net_WindowsIPC*)user;
winIPC->paused = 0;
if (winIPC->reservedState != NULL)
{
ILibChain_WaitHandle_RestoreState(winIPC->mChain, winIPC->reservedState);
winIPC->reservedState = NULL;
}
else
{
ILibDuktape_server_ipc_ReadSink(winIPC->mChain, winIPC->mPipeHandle, ILibWaitHandle_ErrorStatus_NONE, NULL, 0, winIPC);
}
}
ILibTransport_DoneState ILibDuktape_net_server_IPC_WriteSink(ILibDuktape_DuplexStream *stream, char *buffer, int bufferLen, void *user)
{

View File

@@ -3274,7 +3274,55 @@ void ILibChain_ReadEx2(void *chain, HANDLE h, OVERLAPPED *p, char *buffer, int b
}
BOOL ILibChain_WaitHandleAdded(void *chain, HANDLE h)
{
return(ILibLinkedList_GetNode_Search(((ILibBaseChain*)chain)->auxSelectHandles, NULL, h) != NULL);
}
void* ILibChain_WaitHandle_RemoveAndSaveState(void *chain, HANDLE h)
{
ILibChain_WaitHandleInfo *ret = NULL;
void *node = ILibLinkedList_GetNode_Search(((ILibBaseChain*)chain)->auxSelectHandles, NULL, h);
if (node != NULL)
{
ret = (ILibChain_WaitHandleInfo*)ILibMemory_SmartAllocate(sizeof(ILibChain_WaitHandleInfo));
memcpy_s(ret, ILibMemory_Size(ret), ILibMemory_Extra(node), ILibMemory_Size(ret));
ret->node = h;
ILibLinkedList_Remove(node);
if (((ILibBaseChain*)chain)->currentHandle == h)
{
((ILibBaseChain*)chain)->currentHandle = NULL; ((ILibBaseChain*)chain)->currentInfo = NULL;
}
((ILibBaseChain*)chain)->UnblockFlag = 1;
}
return(ret);
}
void ILibChain_WaitHandle_RestoreState(void *chain, void *state)
{
ILibChain_WaitHandleInfo *info = (ILibChain_WaitHandleInfo*)state;
int msTIMEOUT = -1;
struct timeval current;
if (tvnonzero(&(info->expiration)))
{
// Expiration was specified
ILibGetTimeOfDay(&current);
if (tv2LTEtv1(&(info->expiration), &current))
{
// Expiration happened in the past
msTIMEOUT = 1;
}
else
{
// Expiration is in the future
msTIMEOUT = ILibGetMillisecondTimeSpan(&(info->expiration), &current);
}
}
ILibChain_AddWaitHandleEx(chain, info->node, msTIMEOUT, info->handler, info->user, info->metadata);
ILibMemory_Free(info);
}
void __stdcall ILibChain_AddWaitHandle_apc(ULONG_PTR u)
{
void *chain = ((void**)u)[0];
@@ -8782,6 +8830,42 @@ void ILibGetDiskFreeSpace(void *i64FreeBytesToCaller, void *i64TotalBytes)
*((uint64_t *)i64TotalBytes)= (uint64_t)stfs.f_blocks * stfs.f_bsize;
#endif
}
int ILibGetMillisecondTimeSpan(struct timeval *tv1, struct timeval *tv2)
{
struct timeval a;
struct timeval b;
int seconds;
if (tv2LTtv1(tv1, tv2))
{
memcpy_s(&a, sizeof(a), tv2, sizeof(a));
memcpy_s(&b, sizeof(b), tv1, sizeof(b));
}
else
{
memcpy_s(&b, sizeof(b), tv2, sizeof(b));
memcpy_s(&a, sizeof(a), tv1, sizeof(a));
}
if (b.tv_usec < a.tv_usec)
{
seconds = (a.tv_usec - b.tv_usec) / 1000000 + 1;
a.tv_usec -= (1000000 * seconds);
a.tv_sec += seconds;
}
if (b.tv_usec - a.tv_usec > 1000000)
{
seconds = (b.tv_usec - a.tv_usec) / 1000000;
a.tv_usec += (1000000 * seconds);
a.tv_sec -= seconds;
}
// Millisecond Span
return(((b.tv_sec - a.tv_sec) * 1000) + ((b.tv_usec - a.tv_usec) / 1000));
}
long ILibGetTimeStamp()
{
struct timeval tv;

View File

@@ -999,6 +999,10 @@ int ILibIsRunningOnChainThread(void* chain);
}ILibChain_WriteEx_data;
char *ILibChain_MetaData(char *file, int number);
int ILibGetMillisecondTimeSpan(struct timeval *tv1, struct timeval *tv2);
void* ILibChain_WaitHandle_RemoveAndSaveState(void *chain, HANDLE h);
void ILibChain_WaitHandle_RestoreState(void *chain, void *state);
BOOL ILibChain_WaitHandleAdded(void *chain, HANDLE h);
void ILibChain_AddWaitHandleEx(void *chain, HANDLE h, int msTIMEOUT, ILibChain_WaitHandleHandler handler, void *user, char *metadata);
#define ILibChain_AddWaitHandle(chain, h, msTIMEOUT, handler, user) ILibChain_AddWaitHandleEx(chain, h, msTIMEOUT, handler, user, ILibChain_MetaData(__FILE__, __LINE__))
void ILibChain_RemoveWaitHandle(void *chain, HANDLE h);