diff --git a/microscript/ILibDuktape_ReadableStream.c b/microscript/ILibDuktape_ReadableStream.c index 59c0dea..ea9ff54 100644 --- a/microscript/ILibDuktape_ReadableStream.c +++ b/microscript/ILibDuktape_ReadableStream.c @@ -295,7 +295,8 @@ void __stdcall ILibDuktape_readableStream_WriteData_OnData_ChainThread_APC(ULONG int ILibDuktape_readableStream_WriteDataEx(ILibDuktape_readableStream *stream, int streamReserved, char* buffer, size_t bufferLen) { - ILibDuktape_readableStream_nextWriteablePipe *w; + ILibTransport_DoneState rv; + ILibDuktape_readableStream_nextWriteablePipe *w, *wnext; int dispatchedNonNative = 0; int dispatched = 0; int needPause = 0; @@ -315,6 +316,11 @@ int ILibDuktape_readableStream_WriteDataEx(ILibDuktape_readableStream *stream, i stream->pipeInProgress = 1; ILibSpinLock_UnLock(&(stream->pipeLock)); +#ifdef WIN32 + stream->pipedThreadID = GetCurrentThreadId(); +#else + stream->pipedThreadID = pthread_self(); +#endif w = stream->nextWriteable; stream->pipe_pendingCount = 0; while (w != NULL) @@ -326,12 +332,15 @@ int ILibDuktape_readableStream_WriteDataEx(ILibDuktape_readableStream *stream, i w = stream->nextWriteable; while (w != NULL) { + wnext = w->next; if (w->nativeWritable != NULL) { ILibDuktape_WritableStream *ws = (ILibDuktape_WritableStream*)w->nativeWritable; ws->Reserved = streamReserved; ws->endBytes = -1; - switch (ws->WriteSink(ws, buffer, (int)bufferLen, ws->WriteSink_User)) + rv = ws->WriteSink(ws, buffer, (int)bufferLen, ws->WriteSink_User); + if (!ILibMemory_CanaryOK(ws)) { break; } + switch (rv) { case ILibTransport_DoneState_INCOMPLETE: ws->OnWriteFlushEx = ILibDuktape_readableStream_WriteData_Flush; @@ -377,7 +386,7 @@ int ILibDuktape_readableStream_WriteDataEx(ILibDuktape_readableStream *stream, i } } } - w = w->next; + w = ILibMemory_CanaryOK(wnext) ? wnext : NULL; } } @@ -647,8 +656,7 @@ duk_ret_t ILibDuktape_readableStream_pipe(duk_context *ctx) duk_swap_top(ctx, -2); // [push][this] duk_dup(ctx, 0); // [push][this][dest] ILibDuktape_Push_ObjectStash(ctx); // [push][this][dest][stash] - duk_push_fixed_buffer(ctx, sizeof(ILibDuktape_readableStream_nextWriteablePipe)); // [push][this][dest][stash][buffer] - w = (ILibDuktape_readableStream_nextWriteablePipe*)Duktape_GetBuffer(ctx, -1, NULL); + w = (ILibDuktape_readableStream_nextWriteablePipe*)Duktape_PushBuffer(ctx, sizeof(ILibDuktape_readableStream_nextWriteablePipe)); duk_put_prop_string(ctx, -2, Duktape_GetStashKey(duk_get_heapptr(ctx, -1))); // [push][this][dest][stash] duk_pop(ctx); // [push][this][dest] duk_call_method(ctx, 1); duk_pop(ctx); // ... @@ -724,7 +732,11 @@ void ILibDuktape_readableStream_unpipe_later(duk_context *ctx, void ** args, int if (data->emitter->ctx == NULL) { return; } ILibSpinLock_Lock(&(data->pipeLock)); - if (data->pipeInProgress != 0) +#ifdef WIN32 + if (data->pipeInProgress != 0 && data->pipedThreadID != GetCurrentThreadId()) +#else + if (data->pipeInProgress != 0 && !pthread_equal(pthread_self(), data->pipedThreadID)) +#endif { // We must yield, and try again, because there's an active dispatch going on void *imm = ILibDuktape_Immediate(ctx, (void*[]) { args[0], args[1] }, argsLen, ILibDuktape_readableStream_unpipe_later); @@ -796,6 +808,7 @@ void ILibDuktape_readableStream_unpipe_later(duk_context *ctx, void ** args, int else { // 'unpipe' all pipes + data->pipeInProgress = 0; while (w != NULL) { duk_push_heapptr(ctx, w->writableStream); // [ws] @@ -863,7 +876,8 @@ duk_ret_t ILibDuktape_readableStream_unpipe(duk_context *ctx) duk_dup(ctx, -2); // [readable][pause][this] duk_call_method(ctx, 0); duk_pop(ctx); // [readable] } - + + //ILibDuktape_readableStream_unpipe_later(ctx, (void*[]) { data->object, nargs == 0 ? NULL : duk_get_heapptr(ctx, 0) }, nargs == 0 ? 1 : 2); // We must yield, and do this on the next event loop, because we can't unpipe if we're called from a pipe'ed call void *imm = ILibDuktape_Immediate(ctx, (void*[]) { duk_get_heapptr(ctx, -1), nargs == 1 ? duk_get_heapptr(ctx, 0) : NULL }, nargs + 1, ILibDuktape_readableStream_unpipe_later); duk_push_heapptr(ctx, imm); // [immediate] diff --git a/microscript/ILibDuktape_ReadableStream.h b/microscript/ILibDuktape_ReadableStream.h index 3adfeef..953d368 100644 --- a/microscript/ILibDuktape_ReadableStream.h +++ b/microscript/ILibDuktape_ReadableStream.h @@ -54,6 +54,11 @@ typedef struct ILibDuktape_readableStream #endif int endRelayed; int pipeInProgress; +#ifdef WIN32 + DWORD pipedThreadID; +#else + pthread_t pipedThreadID; +#endif int unpipeInProgress; int bypassValue; int noPropagateEnd;