1
0
mirror of https://github.com/Ylianst/MeshAgent synced 2026-01-04 17:43:45 +00:00

Updated Windows Chain logic, to use WSAEventSelect and WaitForMultipleObjectsEx

This commit is contained in:
Bryan Roe
2020-05-04 17:25:55 -07:00
parent 11da1118b4
commit 1bd4e3d07c
5 changed files with 207 additions and 81 deletions

View File

@@ -180,6 +180,7 @@ int ILibWhichPowerOfTwo(int number)
}
#ifdef WIN32
#define ILibChain_HandleInfoIndex(i) (FD_SETSIZE+i)
WCHAR ILibWideScratchPad[4096];
char ILibUTF8ScratchPad[4096];
char *ILibWideToUTF8Ex(WCHAR* wstr, int len, char *buffer, int bufferLen)
@@ -948,7 +949,8 @@ typedef struct ILibBaseChain
pthread_t ChainThreadID;
#endif
#if defined(WIN32)
SOCKET TerminateSock;
int UnblockFlag;
void* auxSelectHandles;
#else
int TerminatePipe[2];
#endif
@@ -973,6 +975,14 @@ typedef struct ILibBaseChain
void *node;
}ILibBaseChain;
typedef struct ILibChain_WaitHandleInfo
{
void *node;
ILibChain_WaitHandleHandler handler;
void *user;
struct timeval expiration;
}ILibChain_WaitHandleInfo;
void* ILibMemory_AllocateA_Init(void *buffer)
{
((void**)buffer)[0] = (char*)buffer + sizeof(void*);
@@ -1834,7 +1844,6 @@ void *ILibCreateChainEx(int extraMemorySize)
#if defined(WIN32) || defined(_WIN32_WCE)
RetVal->TerminateFlag = 1;
RetVal->TerminateSock = WSASocketW(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_NO_HANDLE_INHERIT);
RetVal->ChainProcessHandle = GetCurrentProcess();
if (!SymInitialize(RetVal->ChainProcessHandle, NULL, TRUE)) { RetVal->ChainProcessHandle = NULL; }
#endif
@@ -1902,12 +1911,8 @@ void* ILibGetBaseTimer(void *chain)
void __stdcall ILibForceUnBlockChain_APC(ULONG_PTR obj)
{
struct ILibBaseChain *c = (struct ILibBaseChain*)obj;
c->selectTimeout = 0;
if (c->TerminateSock != ~0)
{
closesocket(c->TerminateSock);
}
c->TerminateSock = WSASocketW(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_NO_HANDLE_INHERIT);
//c->selectTimeout = 0;
c->UnblockFlag = 1;
}
#endif
/*! \fn ILibForceUnBlockChain(void *Chain)
@@ -2061,12 +2066,7 @@ ILibExportMethod void ILibChain_Continue(void *Chain, ILibChain_Link **modules,
tv.tv_usec = 1000 * (chain->selectTimeout % 1000);
#if defined(WIN32) || defined(_WIN32_WCE)
//
// Add the fake socket, for ILibForceUnBlockChain
//
FD_SET(chain->TerminateSock, &errorset);
#else
#if !defined(WIN32)
//
// Put the Read end of the Pipe in the FDSET, for ILibForceUnBlockChain
//
@@ -2762,9 +2762,7 @@ char *ILibChain_GetMetaDataFromDescriptorSet(void *chain, fd_set *inr, fd_set *i
if (len > 0) { ret = ILibScratchPad; }
if (ret == NULL)
{
#if defined(WIN32)
if (FD_ISSET(((ILibBaseChain*)chain)->TerminateSock, ine) || FD_ISSET(((ILibBaseChain*)chain)->TerminateSock, inr)) { ret = "ILibForceUnblockChain"; }
#else
#if !defined(WIN32)
if (FD_ISSET(((ILibBaseChain*)chain)->TerminatePipe[0], inr)) { ret = "ILibForceUnblockChain"; }
#endif
if (ret == NULL)
@@ -2806,6 +2804,25 @@ void *ILibChain_GetObjectForDescriptor(void *chain, int fd)
return(ret);
}
#ifdef WIN32
void ILibChain_AddWaitHandle(void *chain, HANDLE h, int msTIMEOUT, ILibChain_WaitHandleHandler handler, void *user)
{
void *node = ILibLinkedList_AddTail(((ILibBaseChain*)chain)->auxSelectHandles, h);
ILibChain_WaitHandleInfo *info = (ILibChain_WaitHandleInfo*)ILibMemory_Extra(node);
info->handler = handler;
info->user = user;
info->node = node;
if (msTIMEOUT != INFINITE && msTIMEOUT >= 0)
{
ILibGetTimeOfDay(&(info->expiration));
info->expiration.tv_sec += (long)(msTIMEOUT / 1000);
info->expiration.tv_usec += ((msTIMEOUT % 1000) * 1000);
}
ILibForceUnBlockChain(chain);
}
#endif
/*! \fn ILibStartChain(void *Chain)
\brief Starts a Chain
\par
@@ -2824,7 +2841,17 @@ ILibExportMethod void ILibStartChain(void *Chain)
fd_set errorset;
fd_set writeset;
struct timeval tv;
#ifdef WIN32
void *node;
DWORD waitTimeout;
HANDLE selectHandles[FD_SETSIZE];
HANDLE selectEvents[FD_SETSIZE*2];
memset(selectHandles, 0, sizeof(selectHandles));
memset(selectEvents, 0, sizeof(selectEvents));
struct timeval currentTime;
struct timeval expirationTime;
#endif
struct timeval tv;
int slct;
int vX;
@@ -2835,6 +2862,7 @@ ILibExportMethod void ILibStartChain(void *Chain)
chain->ChainThreadID = GetCurrentThreadId();
chain->ChainProcessHandle = GetCurrentProcess();
chain->MicrostackThreadHandle = OpenThread(THREAD_ALL_ACCESS, FALSE, chain->ChainThreadID);
chain->auxSelectHandles = ILibLinkedList_CreateEx(sizeof(ILibChain_WaitHandleInfo));
#else
chain->ChainThreadID = pthread_self();
#endif
@@ -2921,12 +2949,7 @@ ILibExportMethod void ILibStartChain(void *Chain)
tv.tv_usec = 1000 * (chain->selectTimeout % 1000);
#if defined(WIN32) || defined(_WIN32_WCE)
//
// Add the fake socket, for ILibForceUnBlockChain
//
FD_SET(chain->TerminateSock, &errorset);
#else
#if !defined(WIN32)
//
// Put the Read end of the Pipe in the FDSET, for ILibForceUnBlockChain
//
@@ -2952,14 +2975,139 @@ ILibExportMethod void ILibStartChain(void *Chain)
//
chain->PreSelectCount++;
#ifdef WIN32
if (readset.fd_count == 0 && writeset.fd_count == 0)
if (readset.fd_count == 0 && writeset.fd_count == 0 && ILibLinkedList_GetNode_Head(chain->auxSelectHandles)==NULL)
{
SleepEx((DWORD)chain->selectTimeout, TRUE); // If there is no pending IO, we must force the thread into an alertable wait state, so ILibForceUnblockChain can function.
slct = -1;
}
else
{
int i;
int x = 0;
long flags;
for (i = 0; i < (int)readset.fd_count; ++i)
{
selectHandles[x++] = (HANDLE)readset.fd_array[i];
selectEvents[ILibChain_HandleInfoIndex(x - 1)] = NULL;
}
for (i = 0; i < (int)writeset.fd_count; ++i)
{
if (!FD_ISSET(writeset.fd_array[i], &readset))
{
selectHandles[x++] = (HANDLE)writeset.fd_array[i];
selectEvents[ILibChain_HandleInfoIndex(x - 1)] = NULL;
}
}
for (i = 0; i < (int)errorset.fd_count; ++i)
{
if (!FD_ISSET(errorset.fd_array[i], &readset) && !FD_ISSET(errorset.fd_array[i], &writeset))
{
selectHandles[x++] = (HANDLE)errorset.fd_array[i];
selectEvents[ILibChain_HandleInfoIndex(x - 1)] = NULL;
}
}
for (i = 0; i < x; ++i)
{
if (selectEvents[i] == NULL || selectEvents[ILibChain_HandleInfoIndex(i)] != NULL)
{
selectEvents[i] = WSACreateEvent();
}
else
{
WSAResetEvent(selectEvents[i]);
}
flags = 0;
if (FD_ISSET(selectHandles[i], &readset)) { flags |= (FD_READ | FD_ACCEPT); }
if (FD_ISSET(selectHandles[i], &writeset)) { flags |= (FD_WRITE | FD_CONNECT); }
if (FD_ISSET(selectHandles[i], &errorset)) { flags |= FD_CLOSE; }
WSAEventSelect((SOCKET)selectHandles[i], selectEvents[i], flags);
}
ILibGetTimeOfDay(&currentTime);
memcpy_s(&expirationTime, sizeof(struct timeval), &currentTime, sizeof(struct timeval));
expirationTime.tv_sec += tv.tv_sec;
expirationTime.tv_usec += tv.tv_usec;
node = ILibLinkedList_GetNode_Head(chain->auxSelectHandles);
while (node != NULL)
{
i = x++;
if (selectEvents[i] != NULL && selectEvents[ILibChain_HandleInfoIndex(i)] == NULL) { WSACloseEvent(selectEvents[i]); }
selectEvents[i] = (HANDLE)ILibLinkedList_GetDataFromNode(node);
selectEvents[ILibChain_HandleInfoIndex(i)] = (HANDLE)ILibMemory_Extra(node);
if (((ILibChain_WaitHandleInfo*)ILibMemory_Extra(node))->expiration.tv_sec != 0 || ((ILibChain_WaitHandleInfo*)ILibMemory_Extra(node))->expiration.tv_usec != 0)
{
// Timeout was specified
if (tv2LTtv1(&expirationTime, &(((ILibChain_WaitHandleInfo*)ILibMemory_Extra(node))->expiration)))
{
expirationTime.tv_sec = ((ILibChain_WaitHandleInfo*)ILibMemory_Extra(node))->expiration.tv_sec;
expirationTime.tv_usec = ((ILibChain_WaitHandleInfo*)ILibMemory_Extra(node))->expiration.tv_usec;
// If the expiration happens in the past, we need to set the timeout to zero
if (tv2LTtv1(&expirationTime, &currentTime)) { expirationTime.tv_sec = currentTime.tv_sec; expirationTime.tv_usec = currentTime.tv_usec; }
}
}
node = ILibLinkedList_GetNextNode(node);
}
expirationTime.tv_sec -= currentTime.tv_sec;
expirationTime.tv_usec -= currentTime.tv_usec;
waitTimeout = (DWORD)((expirationTime.tv_sec * 1000) + (expirationTime.tv_usec / 0.001));
while ((slct = WaitForMultipleObjectsEx(x, selectEvents, FALSE, (DWORD)((tv.tv_sec * 1000) + (tv.tv_usec / 0.001)), TRUE)) == WAIT_IO_COMPLETION && chain->UnblockFlag == 0) {}
ILibGetTimeOfDay(&currentTime);
if (slct != WAIT_IO_COMPLETION && (slct - WAIT_OBJECT_0 >= 0) && (slct - WAIT_OBJECT_0 < x))
{
if (selectEvents[ILibChain_HandleInfoIndex(slct)] != NULL)
{
ILibChain_WaitHandleInfo *info = (ILibChain_WaitHandleInfo*)selectEvents[ILibChain_HandleInfoIndex(slct)];
selectEvents[ILibChain_HandleInfoIndex(slct)] = NULL;
if (info->handler != NULL)
{
if (info->handler(chain, selectEvents[slct], ILibWaitHandle_ErrorStatus_NONE, info->user) == FALSE)
{
// FALSE means to remove tha HANDLE
ILibLinkedList_Remove(info->node);
}
}
}
}
if (slct == WAIT_TIMEOUT)
{
for (i = 0; i < x; ++i)
{
if (selectEvents[ILibChain_HandleInfoIndex(i)] != NULL && tvnonzero(&(((ILibChain_WaitHandleInfo*)selectEvents[ILibChain_HandleInfoIndex(i)])->expiration)))
{
if (tv2LTEtv1(&currentTime, &(((ILibChain_WaitHandleInfo*)selectEvents[ILibChain_HandleInfoIndex(i)])->expiration)))
{
// TIMEOUT occured
if (((ILibChain_WaitHandleInfo*)selectEvents[ILibChain_HandleInfoIndex(i)])->handler != NULL)
{
((ILibChain_WaitHandleInfo*)selectEvents[ILibChain_HandleInfoIndex(i)])->handler(chain, selectEvents[i], ILibWaitHandle_ErrorStatus_TIMEOUT, ((ILibChain_WaitHandleInfo*)selectEvents[ILibChain_HandleInfoIndex(i)])->user);
}
ILibLinkedList_Remove(((ILibChain_WaitHandleInfo*)selectEvents[ILibChain_HandleInfoIndex(i)])->node);
}
}
}
}
if (slct == WAIT_FAILED)
{
// One of the handles is invalid... Kick it out
for (i = 0; i < x; ++i)
{
if (selectEvents[ILibChain_HandleInfoIndex(i)] != NULL)
{
if (WaitForSingleObject(selectEvents[i], 0) == WAIT_FAILED)
{
if (((ILibChain_WaitHandleInfo*)selectEvents[ILibChain_HandleInfoIndex(i)])->handler != NULL)
{
((ILibChain_WaitHandleInfo*)selectEvents[ILibChain_HandleInfoIndex(i)])->handler(chain, selectEvents[i], ILibWaitHandle_ErrorStatus_INVALID_HANDLE, ((ILibChain_WaitHandleInfo*)selectEvents[ILibChain_HandleInfoIndex(i)])->user);
}
ILibLinkedList_Remove(((ILibChain_WaitHandleInfo*)selectEvents[ILibChain_HandleInfoIndex(i)])->node);
}
}
}
}
tv.tv_sec = 0; tv.tv_usec = 0;
slct = select(FD_SETSIZE, &readset, &writeset, &errorset, &tv);
chain->UnblockFlag = 0;
}
#else
slct = select(FD_SETSIZE, &readset, &writeset, &errorset, &tv);
@@ -3083,6 +3231,16 @@ ILibExportMethod void ILibStartChain(void *Chain)
chain->node = ILibLinkedList_Remove(chain->node);
}
for (vX = 0; vX < FD_SETSIZE; ++vX)
{
if (selectHandles[vX] != NULL && selectHandles[ILibChain_HandleInfoIndex(vX)] == NULL)
{
WSACloseEvent(selectHandles[vX]);
}
}
ILibLinkedList_Destroy(chain->auxSelectHandles);
if (gILibChain == Chain) { gILibChain = NULL; } // Reset the global instance if it was set
//
@@ -3129,13 +3287,6 @@ ILibExportMethod void ILibStartChain(void *Chain)
((ILibBaseChain*)Chain)->TerminatePipe[0] = 0;
((ILibBaseChain*)Chain)->TerminatePipe[1] = 0;
#endif
#if defined(WIN32)
if (((ILibBaseChain*)Chain)->TerminateSock != ~0)
{
closesocket(((ILibBaseChain*)Chain)->TerminateSock);
((ILibBaseChain*)Chain)->TerminateSock = (SOCKET)~0;
}
#endif
#ifdef WIN32
WSACleanup();