mirror of
https://github.com/Ylianst/MeshAgent
synced 2025-12-06 00:13:33 +00:00
Fixed bug in pipe/unpipe logic that could cause 100% cpu usage
This commit is contained in:
@@ -295,7 +295,8 @@ void __stdcall ILibDuktape_readableStream_WriteData_OnData_ChainThread_APC(ULONG
|
||||
|
||||
int ILibDuktape_readableStream_WriteDataEx(ILibDuktape_readableStream *stream, int streamReserved, char* buffer, size_t bufferLen)
|
||||
{
|
||||
ILibDuktape_readableStream_nextWriteablePipe *w;
|
||||
ILibTransport_DoneState rv;
|
||||
ILibDuktape_readableStream_nextWriteablePipe *w, *wnext;
|
||||
int dispatchedNonNative = 0;
|
||||
int dispatched = 0;
|
||||
int needPause = 0;
|
||||
@@ -315,6 +316,11 @@ int ILibDuktape_readableStream_WriteDataEx(ILibDuktape_readableStream *stream, i
|
||||
stream->pipeInProgress = 1;
|
||||
ILibSpinLock_UnLock(&(stream->pipeLock));
|
||||
|
||||
#ifdef WIN32
|
||||
stream->pipedThreadID = GetCurrentThreadId();
|
||||
#else
|
||||
stream->pipedThreadID = pthread_self();
|
||||
#endif
|
||||
w = stream->nextWriteable;
|
||||
stream->pipe_pendingCount = 0;
|
||||
while (w != NULL)
|
||||
@@ -326,12 +332,15 @@ int ILibDuktape_readableStream_WriteDataEx(ILibDuktape_readableStream *stream, i
|
||||
w = stream->nextWriteable;
|
||||
while (w != NULL)
|
||||
{
|
||||
wnext = w->next;
|
||||
if (w->nativeWritable != NULL)
|
||||
{
|
||||
ILibDuktape_WritableStream *ws = (ILibDuktape_WritableStream*)w->nativeWritable;
|
||||
ws->Reserved = streamReserved;
|
||||
ws->endBytes = -1;
|
||||
switch (ws->WriteSink(ws, buffer, (int)bufferLen, ws->WriteSink_User))
|
||||
rv = ws->WriteSink(ws, buffer, (int)bufferLen, ws->WriteSink_User);
|
||||
if (!ILibMemory_CanaryOK(ws)) { break; }
|
||||
switch (rv)
|
||||
{
|
||||
case ILibTransport_DoneState_INCOMPLETE:
|
||||
ws->OnWriteFlushEx = ILibDuktape_readableStream_WriteData_Flush;
|
||||
@@ -377,7 +386,7 @@ int ILibDuktape_readableStream_WriteDataEx(ILibDuktape_readableStream *stream, i
|
||||
}
|
||||
}
|
||||
}
|
||||
w = w->next;
|
||||
w = ILibMemory_CanaryOK(wnext) ? wnext : NULL;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -647,8 +656,7 @@ duk_ret_t ILibDuktape_readableStream_pipe(duk_context *ctx)
|
||||
duk_swap_top(ctx, -2); // [push][this]
|
||||
duk_dup(ctx, 0); // [push][this][dest]
|
||||
ILibDuktape_Push_ObjectStash(ctx); // [push][this][dest][stash]
|
||||
duk_push_fixed_buffer(ctx, sizeof(ILibDuktape_readableStream_nextWriteablePipe)); // [push][this][dest][stash][buffer]
|
||||
w = (ILibDuktape_readableStream_nextWriteablePipe*)Duktape_GetBuffer(ctx, -1, NULL);
|
||||
w = (ILibDuktape_readableStream_nextWriteablePipe*)Duktape_PushBuffer(ctx, sizeof(ILibDuktape_readableStream_nextWriteablePipe));
|
||||
duk_put_prop_string(ctx, -2, Duktape_GetStashKey(duk_get_heapptr(ctx, -1))); // [push][this][dest][stash]
|
||||
duk_pop(ctx); // [push][this][dest]
|
||||
duk_call_method(ctx, 1); duk_pop(ctx); // ...
|
||||
@@ -724,7 +732,11 @@ void ILibDuktape_readableStream_unpipe_later(duk_context *ctx, void ** args, int
|
||||
|
||||
if (data->emitter->ctx == NULL) { return; }
|
||||
ILibSpinLock_Lock(&(data->pipeLock));
|
||||
if (data->pipeInProgress != 0)
|
||||
#ifdef WIN32
|
||||
if (data->pipeInProgress != 0 && data->pipedThreadID != GetCurrentThreadId())
|
||||
#else
|
||||
if (data->pipeInProgress != 0 && !pthread_equal(pthread_self(), data->pipedThreadID))
|
||||
#endif
|
||||
{
|
||||
// We must yield, and try again, because there's an active dispatch going on
|
||||
void *imm = ILibDuktape_Immediate(ctx, (void*[]) { args[0], args[1] }, argsLen, ILibDuktape_readableStream_unpipe_later);
|
||||
@@ -796,6 +808,7 @@ void ILibDuktape_readableStream_unpipe_later(duk_context *ctx, void ** args, int
|
||||
else
|
||||
{
|
||||
// 'unpipe' all pipes
|
||||
data->pipeInProgress = 0;
|
||||
while (w != NULL)
|
||||
{
|
||||
duk_push_heapptr(ctx, w->writableStream); // [ws]
|
||||
@@ -863,7 +876,8 @@ duk_ret_t ILibDuktape_readableStream_unpipe(duk_context *ctx)
|
||||
duk_dup(ctx, -2); // [readable][pause][this]
|
||||
duk_call_method(ctx, 0); duk_pop(ctx); // [readable]
|
||||
}
|
||||
|
||||
|
||||
//ILibDuktape_readableStream_unpipe_later(ctx, (void*[]) { data->object, nargs == 0 ? NULL : duk_get_heapptr(ctx, 0) }, nargs == 0 ? 1 : 2);
|
||||
// We must yield, and do this on the next event loop, because we can't unpipe if we're called from a pipe'ed call
|
||||
void *imm = ILibDuktape_Immediate(ctx, (void*[]) { duk_get_heapptr(ctx, -1), nargs == 1 ? duk_get_heapptr(ctx, 0) : NULL }, nargs + 1, ILibDuktape_readableStream_unpipe_later);
|
||||
duk_push_heapptr(ctx, imm); // [immediate]
|
||||
|
||||
@@ -54,6 +54,11 @@ typedef struct ILibDuktape_readableStream
|
||||
#endif
|
||||
int endRelayed;
|
||||
int pipeInProgress;
|
||||
#ifdef WIN32
|
||||
DWORD pipedThreadID;
|
||||
#else
|
||||
pthread_t pipedThreadID;
|
||||
#endif
|
||||
int unpipeInProgress;
|
||||
int bypassValue;
|
||||
int noPropagateEnd;
|
||||
|
||||
Reference in New Issue
Block a user