1
0
mirror of https://github.com/Ylianst/MeshAgent synced 2025-12-22 11:13:21 +00:00

Much improved, better stability, lots of fixes

This commit is contained in:
Ylian Saint-Hilaire
2018-01-12 11:50:04 -08:00
parent becf71557f
commit 508646044e
69 changed files with 11803 additions and 4088 deletions

View File

@@ -25,6 +25,7 @@ limitations under the License.
#define ILibDuktape_readableStream_WritePipes "\xFF_WritePipes"
#define ILibDuktape_readableStream_WritePipes_PTRBUFFER "\xFF_WritePipesPtrBuffer"
#define ILibDuktape_readableStream_WritePipes_Stream "\xFF_WritePipes_Stream"
#define ILibDuktape_readableStream_PipeArray "\xFF_RS_PipeArray"
#ifdef __DOXY__
/*!
@@ -124,40 +125,48 @@ void ILibDuktape_readableStream_WriteData_buffer(ILibDuktape_readableStream *str
}
tmp->Next = buffered;
}
if (stream->paused == 0)
{
stream->paused = 1;
stream->PauseHandler(stream, stream->user);
}
}
void ILibDuktape_readableStream_WriteData_OnData_ChainThread(void *chain, void *user)
{
ILibDuktape_readableStream *stream = (ILibDuktape_readableStream*)user;
ILibDuktape_readableStream_bufferedData *data = (ILibDuktape_readableStream_bufferedData*)user;
ILibDuktape_readableStream *stream = (ILibDuktape_readableStream*)data->Next;
stream->paused = 0;
duk_push_heapptr(stream->ctx, stream->OnData); // [func]
duk_push_heapptr(stream->ctx, stream->object); // [func][this]
if (stream->extBuffer_Reserved == 0)
if (data->Reserved == 0)
{
duk_push_heapptr(stream->ctx, stream->extBuffer); // [func][this][buffer]
duk_config_buffer(stream->ctx, -1, stream->extBuffer_buffer, stream->extBuffer_bufferLen);
duk_push_external_buffer(stream->ctx); // [ext]
duk_config_buffer(stream->ctx, -1, data->buffer, data->bufferLen);
}
duk_push_heapptr(stream->ctx, stream->OnData); // [ext][func]
duk_push_heapptr(stream->ctx, stream->object); // [ext][func][this]
if (data->Reserved == 0)
{
duk_push_buffer_object(stream->ctx, -3, 0, data->bufferLen, DUK_BUFOBJ_NODEJS_BUFFER); // [ext][func][this][buffer]
}
else
{
duk_push_lstring(stream->ctx, stream->extBuffer_buffer, stream->extBuffer_bufferLen);
duk_push_lstring(stream->ctx, data->buffer, data->bufferLen); // [ext][func][this][buffer/string]
}
if (duk_pcall_method(stream->ctx, 1) != 0) // [retVal]
if (duk_pcall_method(stream->ctx, 1) != 0) // [...][retVal]
{
ILibDuktape_Process_UncaughtException(stream->ctx);
}
duk_pop(stream->ctx); // ...
if (data->Reserved == 0)
{
duk_pop_2(stream->ctx); // ...
}
else
{
duk_pop(stream->ctx); // ...
}
free(data);
if (stream->paused == 0 && stream->ResumeHandler != NULL) { stream->ResumeHandler(stream, stream->user); }
}
void ILibDuktape_readableStream_WriteData_Flush(struct ILibDuktape_WritableStream *ws, void *user)
int ILibDuktape_readableStream_WriteData_Flush(struct ILibDuktape_WritableStream *ws, void *user)
{
ILibDuktape_readableStream *stream = (ILibDuktape_readableStream*)user;
int unpipeInProgress = 0;
#ifdef WIN32
if(InterlockedDecrement(&(stream->pipe_pendingCount)) == 0)
#elif defined(__ATOMIC_SEQ_CST)
@@ -169,243 +178,220 @@ void ILibDuktape_readableStream_WriteData_Flush(struct ILibDuktape_WritableStrea
if(stream->pipe_pendingCount == 0)
#endif
{
if (stream->ResumeHandler != NULL) { stream->paused = 0; stream->ResumeHandler(stream, stream->user); }
sem_wait(&(stream->pipeLock));
stream->pipeInProgress = 0;
unpipeInProgress = stream->unpipeInProgress;
sem_post(&(stream->pipeLock));
if (unpipeInProgress == 0 && stream->ResumeHandler != NULL && stream->paused != 0) { stream->paused = 0; stream->ResumeHandler(stream, stream->user); }
return(1);
}
return(0);
}
duk_ret_t ILibDuktape_readableStream_WriteData_Flush_JS(duk_context *ctx)
duk_ret_t ILibDuktape_readableStream_WriteDataEx_Flush(duk_context *ctx)
{
ILibDuktape_readableStream *stream;
duk_push_current_function(ctx);
duk_get_prop_string(ctx, -1, "readable");
stream = (ILibDuktape_readableStream*)duk_get_pointer(ctx, -1);
duk_get_prop_string(ctx, -1, "\xFF_STREAM");
ILibDuktape_readableStream *stream = (ILibDuktape_readableStream*)duk_to_pointer(ctx, -1);
ILibDuktape_readableStream_WriteData_Flush(NULL, stream);
return 0;
return(0);
}
void ILibDuktape_readableStream_WriteData_ChainThread(void *chain, void *user)
int ILibDuktape_readableStream_WriteDataEx_Chain_Dispatch(ILibDuktape_readableStream *stream, void *ws, char *buffer, int bufferLen)
{
ILibDuktape_readableStream *stream = (ILibDuktape_readableStream*)user;
ILibDuktape_readableStream_nextWriteablePipe *w;
int jsCount = 0;
int retVal = 0;
duk_push_external_buffer(stream->ctx); // [ext]
duk_config_buffer(stream->ctx, -1, buffer, bufferLen);
duk_push_heapptr(stream->ctx, ws); // [ext][ws]
duk_get_prop_string(stream->ctx, -1, "write"); // [ext][ws][write]
duk_swap_top(stream->ctx, -2); // [ext][write][this]
duk_push_buffer_object(stream->ctx, -3, 0, bufferLen, DUK_BUFOBJ_NODEJS_BUFFER); // [ext][write][this][buffer]
duk_push_c_function(stream->ctx, ILibDuktape_readableStream_WriteDataEx_Flush, DUK_VARARGS);// [ext][write][this][buffer][flush]
duk_push_pointer(stream->ctx, stream); // [ext][write][this][buffer][flush][ptr]
duk_put_prop_string(stream->ctx, -2, "\xFF_STREAM"); // [ext][write][this][buffer][flush]
if (duk_pcall_method(stream->ctx, 2) != 0) // [ext][...]
{
ILibDuktape_Process_UncaughtExceptionEx(stream->ctx, "readable.write(): Error Piping ");
if (ILibDuktape_readableStream_WriteData_Flush(NULL, stream)) { retVal = 2; }
}
retVal = duk_to_boolean(stream->ctx, -1) ? 1 : 0;
duk_pop_2(stream->ctx);
sem_wait(&(stream->pipeLock));
w = stream->nextWriteable;
stream->pipe_pendingCount = 0;
return(retVal);
}
void ILibDuktape_readableStream_WriteDataEx_Chain(void *chain, void *user)
{
ILibDuktape_readableStream_bufferedData *data = (ILibDuktape_readableStream_bufferedData*)user;
ILibDuktape_readableStream *stream = (ILibDuktape_readableStream*)data->Next;
ILibDuktape_readableStream_nextWriteablePipe *w = stream->nextWriteable;
while (w != NULL)
{
if (w->nativeWritable != NULL || w->writableStream != NULL) { stream->pipe_pendingCount++; }
w = w->next;
}
w = stream->nextWriteable;
while (w != NULL)
{
if (w->nativeWritable != NULL)
if (w->writableStream != NULL && w->nativeWritable == NULL)
{
ILibDuktape_WritableStream *ws = (ILibDuktape_WritableStream*)w->nativeWritable;
switch (ws->WriteSink(ws, stream->extBuffer_buffer, stream->extBuffer_bufferLen, ws->WriteSink_User))
{
case ILibTransport_DoneState_INCOMPLETE:
ws->OnWriteFlushEx = ILibDuktape_readableStream_WriteData_Flush;
ws->OnWriteFlushEx_User = stream;
break;
case ILibTransport_DoneState_COMPLETE:
ws->OnWriteFlushEx = NULL;
ws->OnWriteFlushEx_User = NULL;
#ifdef WIN32
InterlockedDecrement(&(stream->pipe_pendingCount));
#elif defined(__ATOMIC_SEQ_CST)
__atomic_sub_fetch(&(stream->pipe_pendingCount), 1, __ATOMIC_SEQ_CST);
#else
--stream->pipe_pendingCount;
#endif
break;
case ILibTransport_DoneState_ERROR:
#ifdef WIN32
InterlockedDecrement(&(stream->pipe_pendingCount));
#elif defined(__ATOMIC_SEQ_CST)
__atomic_sub_fetch(&(stream->pipe_pendingCount), 1, __ATOMIC_SEQ_CST);
#else
--stream->pipe_pendingCount;
#endif
break;
}
}
else if(w->writableStream != NULL)
{
duk_push_heapptr(stream->ctx, w->writableStream); // [stream]
duk_get_prop_string(stream->ctx, -1, "write"); // [stream][func]
duk_swap_top(stream->ctx, -2); // [func][this]
duk_push_heapptr(stream->ctx, stream->extBuffer); // [func][this][chunk]
duk_config_buffer(stream->ctx, -1, stream->extBuffer_buffer, stream->extBuffer_bufferLen);
duk_push_c_function(stream->ctx, ILibDuktape_readableStream_WriteData_Flush_JS, DUK_VARARGS); // [func][this][chunk][callback]
duk_push_pointer(stream->ctx, stream); // [func][this][chunk][callback][user]
duk_put_prop_string(stream->ctx, -2, "readable"); // [func][this][chunk][callback]
if (duk_pcall_method(stream->ctx, 2) != 0) // [retVal]
{
ILibDuktape_Process_UncaughtException(stream->ctx);
}
else
{
jsCount += duk_get_int(stream->ctx, -1);
}
duk_pop(stream->ctx);
if (ILibDuktape_readableStream_WriteDataEx_Chain_Dispatch(stream, w->writableStream, data->buffer, data->bufferLen) == 2) { break; }
}
w = w->next;
}
if (stream->paused != 0 && stream->pipe_pendingCount == 0)
{
sem_post(&(stream->pipeLock));
if (stream->ResumeHandler != NULL) { stream->paused = 0; stream->ResumeHandler(stream, stream->user); }
}
else
{
sem_post(&(stream->pipeLock));
}
free(data);
}
int ILibDuktape_readableStream_WriteDataEx(ILibDuktape_readableStream *stream, int streamReserved, char* buffer, int bufferLen)
{
ILibDuktape_readableStream_nextWriteablePipe *w;
int nonNativeCount = 0;
int nativeCount = 0;
int dispatchedNonNative = 0;
int noContinue = 0;
int dispatched = 0;
int needPause = 0;
if (stream == NULL) { return(1); }
if (stream->paused != 0)
{
ILibDuktape_readableStream_WriteData_buffer(stream, streamReserved, buffer, bufferLen);
if (stream->paused == 0 && stream->PauseHandler != NULL) { stream->paused = 1; stream->PauseHandler(stream, stream->user); }
return(stream->paused);
}
if (stream->bypassValue == 0 || stream->bypassValue != streamReserved)
{
sem_wait(&(stream->pipeLock));
stream->pipeInProgress = 1;
sem_post(&(stream->pipeLock));
w = stream->nextWriteable;
stream->pipe_pendingCount = 0;
while (w != NULL)
{
++stream->pipe_pendingCount;
w = w->next;
}
dispatched = stream->pipe_pendingCount;
w = stream->nextWriteable;
while (w != NULL)
{
if (w->nativeWritable == 0) { ++nonNativeCount; }
else { ++nativeCount; }
w = w->next;
}
w = stream->nextWriteable;
if (w != NULL)
{
if (nonNativeCount > 0)
if (w->nativeWritable != NULL)
{
// There are piped Pure JavaScript objects... We must context switch to Microstack Thread
stream->extBuffer_Reserved = streamReserved;
stream->extBuffer_buffer = buffer;
stream->extBuffer_bufferLen = bufferLen;
sem_post(&(stream->pipeLock));
if (stream->PauseHandler != NULL) { stream->paused = 1; stream->PauseHandler(stream, stream->user); }
ILibChain_RunOnMicrostackThread(stream->chain, ILibDuktape_readableStream_WriteData_ChainThread, stream);
return(stream->paused);
}
else
{
// All piped objects are native, so we can blast out a send
stream->pipe_pendingCount = nativeCount;
while (w != NULL)
ILibDuktape_WritableStream *ws = (ILibDuktape_WritableStream*)w->nativeWritable;
ws->Reserved = streamReserved;
switch (ws->WriteSink(ws, buffer, bufferLen, ws->WriteSink_User))
{
if (w->nativeWritable != NULL)
{
ILibDuktape_WritableStream *ws = (ILibDuktape_WritableStream*)w->nativeWritable;
ws->Reserved = streamReserved;
switch (ws->WriteSink(ws, buffer, bufferLen, ws->WriteSink_User))
{
case ILibTransport_DoneState_INCOMPLETE:
ws->OnWriteFlushEx = ILibDuktape_readableStream_WriteData_Flush;
ws->OnWriteFlushEx_User = stream;
break;
case ILibTransport_DoneState_COMPLETE:
ws->OnWriteFlushEx = NULL;
ws->OnWriteFlushEx_User = NULL;
#ifdef WIN32
InterlockedDecrement(&(stream->pipe_pendingCount));
#elif defined(__ATOMIC_SEQ_CST)
__atomic_sub_fetch(&(stream->pipe_pendingCount), 1, __ATOMIC_SEQ_CST);
#else
--stream->pipe_pendingCount;
#endif
break;
case ILibTransport_DoneState_ERROR:
#ifdef WIN32
InterlockedDecrement(&(stream->pipe_pendingCount));
#elif defined(__ATOMIC_SEQ_CST)
__atomic_sub_fetch(&(stream->pipe_pendingCount), 1, __ATOMIC_SEQ_CST);
#else
--stream->pipe_pendingCount;
#endif
break;
}
}
w = w->next;
case ILibTransport_DoneState_INCOMPLETE:
ws->OnWriteFlushEx = ILibDuktape_readableStream_WriteData_Flush;
ws->OnWriteFlushEx_User = stream;
needPause = 1;
break;
case ILibTransport_DoneState_COMPLETE:
ws->OnWriteFlushEx = NULL;
ws->OnWriteFlushEx_User = NULL;
if (ILibDuktape_readableStream_WriteData_Flush(ws, stream)) { noContinue = 1; }
break;
case ILibTransport_DoneState_ERROR:
if (ILibDuktape_readableStream_WriteData_Flush(ws, stream)) { noContinue = 1; }
break;
}
if (stream->pipe_pendingCount == 0)
if (noContinue != 0) { break; }
}
else if (w->writableStream != NULL && dispatchedNonNative == 0)
{
if (ILibIsRunningOnChainThread(stream->chain) == 0)
{
sem_post(&(stream->pipeLock));
return(stream->paused);
ILibDuktape_readableStream_bufferedData *tmp = (ILibDuktape_readableStream_bufferedData*)ILibMemory_Allocate(sizeof(ILibDuktape_readableStream_bufferedData) + bufferLen, 0, NULL, NULL);
tmp->Next = (ILibDuktape_readableStream_bufferedData*)stream;
tmp->Reserved = streamReserved;
tmp->bufferLen = bufferLen;
memcpy_s(tmp->buffer, bufferLen, buffer, bufferLen);
dispatchedNonNative = 1;
needPause = 1;
ILibChain_RunOnMicrostackThreadEx(stream->chain, ILibDuktape_readableStream_WriteDataEx_Chain, tmp);
}
else
{
sem_post(&(stream->pipeLock));
if (stream->PauseHandler != NULL) { stream->paused = 1; stream->PauseHandler(stream, stream->user); }
return(stream->paused);
// We're running on the Chain Thread, so we can directly dispatch into JS
switch (ILibDuktape_readableStream_WriteDataEx_Chain_Dispatch(stream, w->writableStream, buffer, bufferLen))
{
case 1: // Need to Pause
needPause = 1;
break;
case 2: // Complete
noContinue = 1;
break;
default: // NOP
break;
}
}
}
}
else
{
sem_post(&(stream->pipeLock));
if (noContinue != 0) { break; }
w = w->next;
}
}
if (stream->OnData != NULL)
if (dispatched == 0)
{
if (ILibIsRunningOnChainThread(stream->chain))
sem_wait(&(stream->pipeLock));
stream->pipeInProgress = 0;
sem_post(&(stream->pipeLock));
if (stream->OnData != NULL)
{
duk_push_heapptr(stream->ctx, stream->OnData); // [func]
duk_push_heapptr(stream->ctx, stream->object); // [func][this]
if (streamReserved == 0)
if (ILibIsRunningOnChainThread(stream->chain))
{
duk_push_heapptr(stream->ctx, stream->extBuffer); // [func][this][buffer]
duk_config_buffer(stream->ctx, -1, buffer, bufferLen);
if (streamReserved == 0)
{
duk_push_external_buffer(stream->ctx); // [extBuffer]
duk_config_buffer(stream->ctx, -1, buffer, bufferLen);
}
duk_push_heapptr(stream->ctx, stream->OnData); // [func]
duk_push_heapptr(stream->ctx, stream->object); // [func][this]
if (streamReserved == 0)
{
duk_push_buffer_object(stream->ctx, -3, 0, bufferLen, DUK_BUFOBJ_NODEJS_BUFFER); // [extBuffer][func][this][nodeBuffer]
}
else
{
duk_push_lstring(stream->ctx, buffer, bufferLen); // [func][this][string]
}
if (duk_pcall_method(stream->ctx, 1) != 0) // [retVal]
{
ILibDuktape_Process_UncaughtException(stream->ctx);
}
if (streamReserved == 0)
{
duk_pop_2(stream->ctx);
}
else
{
duk_pop(stream->ctx); // ...
}
}
else
{
duk_push_lstring(stream->ctx, buffer, bufferLen); // [func][this][string]
// Need to PAUSE, and context switch to Chain Thread, so we can dispatch into JavaScript
ILibDuktape_readableStream_bufferedData *tmp = (ILibDuktape_readableStream_bufferedData*)ILibMemory_Allocate(sizeof(ILibDuktape_readableStream_bufferedData) + bufferLen, 0, NULL, NULL);
tmp->bufferLen = bufferLen;
tmp->Reserved = streamReserved;
tmp->Next = (ILibDuktape_readableStream_bufferedData*)stream;
memcpy_s(tmp->buffer, bufferLen, buffer, bufferLen);
needPause = 1;
ILibChain_RunOnMicrostackThread(stream->chain, ILibDuktape_readableStream_WriteData_OnData_ChainThread, tmp);
}
if (duk_pcall_method(stream->ctx, 1) != 0) // [retVal]
{
ILibDuktape_Process_UncaughtException(stream->ctx);
}
duk_pop(stream->ctx); // ...
}
else
else if (stream->PauseHandler != NULL && stream->OnEnd == NULL)
{
// Need to PAUSE, and context switch to Chain Thread, so we can dispatch into JavaScript
sem_wait(&(stream->pipeLock));
stream->extBuffer_Reserved = streamReserved;
stream->extBuffer_buffer = buffer;
stream->extBuffer_bufferLen = bufferLen;
sem_post(&(stream->pipeLock));
if (stream->paused == 0 && stream->PauseHandler != NULL) { stream->paused = 1; stream->PauseHandler(stream, stream->user); }
ILibChain_RunOnMicrostackThread(stream->chain, ILibDuktape_readableStream_WriteData_OnData_ChainThread, stream);
// If we get here, it means we are writing data, but nobody is going to be receiving it...
// So we need to buffer the data, so when we are resumed later, we can retry
needPause = 1;
ILibDuktape_readableStream_WriteData_buffer(stream, streamReserved, buffer, bufferLen);
}
else if (stream->OnEnd != NULL)
{
return 0;
}
}
else if(stream->PauseHandler != NULL && stream->OnEnd == NULL)
if (needPause)
{
// If we get here, it means we are writing data, but nobody is going to be receiving it...
// So we need to buffer the data, so when we are resumed later, we can retry
ILibDuktape_readableStream_WriteData_buffer(stream, streamReserved, buffer, bufferLen);
}
else if (stream->OnEnd != NULL)
{
return 0;
if (stream->paused == 0 && stream->PauseHandler != NULL) { stream->paused = 1; stream->PauseHandler(stream, stream->user); }
}
return(stream->paused);
}
@@ -426,7 +412,24 @@ int ILibDuktape_readableStream_WriteEnd(ILibDuktape_readableStream *stream)
{
ILibDuktape_readableStream_nextWriteablePipe *next;
if (stream->OnEnd != NULL)
if (stream->noPropagateEnd == 0 && stream->nextWriteable != NULL)
{
next = stream->nextWriteable;
while (next != NULL)
{
duk_push_heapptr(stream->ctx, next->writableStream); // [stream]
duk_get_prop_string(stream->ctx, -1, "end"); // [stream][func]
duk_swap_top(stream->ctx, -2); // [func][this]
if (duk_pcall_method(stream->ctx, 0) != 0) // [retVal]
{
ILibDuktape_Process_UncaughtException(stream->ctx);
}
duk_pop(stream->ctx); // ...
next = next->next;
retVal = 0;
}
}
else if (stream->OnEnd != NULL)
{
duk_context *x = stream->ctx;
duk_push_heapptr(stream->ctx, stream->OnEnd); // [func]
@@ -438,29 +441,15 @@ int ILibDuktape_readableStream_WriteEnd(ILibDuktape_readableStream *stream)
duk_pop(x); // ...
retVal = 0;
}
next = stream->nextWriteable;
while (next != NULL)
{
duk_push_heapptr(stream->ctx, next->writableStream); // [stream]
duk_get_prop_string(stream->ctx, -1, "end"); // [stream][func]
duk_swap_top(stream->ctx, -2); // [func][this]
if (duk_pcall_method(stream->ctx, 0) != 0) // [retVal]
{
ILibDuktape_Process_UncaughtException(stream->ctx);
}
duk_pop(stream->ctx); // ...
next = next->next;
retVal = 0;
}
}
return retVal;
}
void ILibDuktape_readableStream_Closed(ILibDuktape_readableStream *stream)
{
ILibDuktape_readableStream_WriteEnd(stream);
if (stream->OnClose != NULL)
{
duk_push_heapptr(stream->ctx, stream->OnEnd); // [func]
duk_push_heapptr(stream->ctx, stream->OnClose); // [func]
duk_push_heapptr(stream->ctx, stream->object); // [func][this]
if (duk_pcall_method(stream->ctx, 0) != 0) // [retVal]
{
@@ -479,7 +468,14 @@ duk_ret_t ILibDuktape_readableStream_pause(duk_context *ctx)
ptr = (ILibDuktape_readableStream*)Duktape_GetBuffer(ctx, -1, NULL);
duk_pop(ctx); // [stream]
if (ptr->PauseHandler != NULL) { ptr->paused = 1; ptr->PauseHandler(ptr, ptr->user); }
if (ptr->PauseHandler != NULL)
{
ptr->paused = 1; ptr->PauseHandler(ptr, ptr->user);
}
else
{
return(ILibDuktape_Error(ctx, "Pause Not Supported"));
}
return 1;
}
@@ -523,151 +519,240 @@ duk_ret_t ILibDuktape_readableStream_resume(duk_context *ctx)
duk_get_prop_string(ctx, -1, ILibDuktape_readableStream_RSPTRS); // [stream][ptrs]
ptr = (ILibDuktape_readableStream*)Duktape_GetBuffer(ctx, -1, NULL);
duk_pop(ctx); // [stream]
if (ptr->ResumeHandler == NULL) { return(ILibDuktape_Error(ctx, "Resume not supported")); }
if (ILibDuktape_readableStream_resume_flush(ptr) == 0 && ptr->ResumeHandler != NULL) { ptr->paused = 0; ptr->ResumeHandler(ptr, ptr->user); }
return 1;
}
void ILibDuktape_readableStream_pipe_resumeFromTimer(void *obj)
void ILibDuktape_ReadableStream_pipe_ResumeLater(duk_context *ctx, void **args, int argsLen)
{
ILibDuktape_readableStream* ptr = (ILibDuktape_readableStream*)((void**)obj)[0];
if (ILibDuktape_readableStream_resume_flush(ptr) == 0 && ptr->ResumeHandler != NULL) { ptr->paused = 0; ptr->ResumeHandler(ptr, ptr->user); }
free(obj);
ILibDuktape_readableStream *rs = (ILibDuktape_readableStream*)args[0];
if (ILibDuktape_readableStream_resume_flush(rs) == 0 && rs->ResumeHandler != NULL) { rs->paused = 0; rs->ResumeHandler(rs, rs->user); }
if (rs->PipeHookHandler != NULL) { rs->PipeHookHandler(rs, args[1], rs->user); }
}
void ILibDuktape_readableStream_pipe_resumeFromTimer2(void *obj)
void ILibDuktape_readableStream_pipe_later(duk_context *ctx, void **args, int argsLen)
{
free(obj);
duk_push_heapptr(ctx, args[0]); // [readable]
duk_get_prop_string(ctx, -1, "pipe"); // [readable][pipe]
duk_swap_top(ctx, -2); // [pipe][this]
duk_push_heapptr(ctx, args[1]); // [pipe][this][writable]
if (argsLen > 2) { duk_push_heapptr(ctx, args[2]); } // [pipe][this][writable][options]
if (duk_pcall_method(ctx, argsLen - 1) != 0) { ILibDuktape_Process_UncaughtExceptionEx(ctx, "readableStream.pipeLater(): "); }
duk_pop(ctx); // ...
}
duk_ret_t ILibDuktape_readableStream_pipe(duk_context *ctx)
{
ILibDuktape_readableStream *rstream;
ILibDuktape_readableStream_nextWriteablePipe *w;
ILibDuktape_readableStream_nextWriteablePipe *w, *tmp;
int nargs = duk_get_top(ctx);
duk_push_this(ctx); // [readable]
duk_get_prop_string(ctx, -1, ILibDuktape_readableStream_RSPTRS); // [readable][ptrs]
rstream = (ILibDuktape_readableStream*)Duktape_GetBuffer(ctx, -1, NULL);
duk_pop(ctx); // [readable]
duk_pop_2(ctx); // ...
if (nargs > 1 && duk_is_object(ctx, 1))
sem_wait(&(rstream->pipeLock));
if (rstream->pipeInProgress != 0)
{
rstream->bypassValue = Duktape_GetIntPropertyValue(ctx, 1, "dataTypeSkip", 0);
}
// We must YIELD and try again later, becuase there is an active dispatch going on
duk_push_this(ctx);
ILibDuktape_Immediate(ctx, (void*[]) { duk_get_heapptr(ctx, -1), duk_get_heapptr(ctx, 0), nargs > 1 ? duk_get_heapptr(ctx, 1) : NULL }, 1 + nargs, ILibDuktape_readableStream_pipe_later);
duk_push_object(ctx); // [readable][nextWriteable]
duk_push_fixed_buffer(ctx, sizeof(ILibDuktape_readableStream_nextWriteablePipe)); // [readable][nextWriteable][ptrBuffer]
w = (ILibDuktape_readableStream_nextWriteablePipe*)Duktape_GetBuffer(ctx, -1, NULL);
memset(w, 0, sizeof(ILibDuktape_readableStream_nextWriteablePipe));
duk_put_prop_string(ctx, -2, ILibDuktape_readableStream_WritePipes_PTRBUFFER); // [readable][nextWriteable]
if (duk_has_prop_string(ctx, -2, ILibDuktape_readableStream_WritePipes))
{
// There are already associated pipes
duk_get_prop_string(ctx, -2, ILibDuktape_readableStream_WritePipes); // [readable][nextWriteable][prevWriteable]
duk_get_prop_string(ctx, -1, ILibDuktape_readableStream_WritePipes_PTRBUFFER); // [readable][nextWriteable][prevWriteable][ptr]
w->next = (ILibDuktape_readableStream_nextWriteablePipe*)Duktape_GetBuffer(ctx, -1, NULL);
duk_pop(ctx); // [readable][nextWriteable][prevWriteable]
duk_put_prop_string(ctx, -2, ILibDuktape_readableStream_WritePipes); // [readable][nextWriteable]
}
duk_dup(ctx, 0); // [readable][nextWriteable][stream]
if (duk_has_prop_string(ctx, -1, ILibDuktape_WritableStream_WSPTRS))
{
// This is one of our writable stream implementation... So we can keep everything native
duk_get_prop_string(ctx, -1, ILibDuktape_WritableStream_WSPTRS); // [readable][nextWriteable][stream][buffer]
w->nativeWritable = Duktape_GetBuffer(ctx, -1, NULL);
duk_pop(ctx); // [readable][nextWriteable][stream]
}
w->writableStream = duk_get_heapptr(ctx, -1);
duk_put_prop_string(ctx, -2, ILibDuktape_readableStream_WritePipes_Stream); // [readable][nextWriteable]
rstream->nextWriteable = w;
// Save to the readableStream
duk_put_prop_string(ctx, -2, ILibDuktape_readableStream_WritePipes); // [readable]
duk_dup(ctx, 0); // [readable][writable]
if (duk_has_prop_string(ctx, -1, "emit"))
{
duk_push_string(ctx, "emit"); // [readable][writable][key]
duk_push_string(ctx, "pipe"); // [readable][writable][key][eventName]
duk_dup(ctx, -4); // [readable][writable][key][eventName][readable]
if (duk_pcall_prop(ctx, -4, 2) != 0) // [readable][writable][retVal/err]
{
ILibDuktape_Process_UncaughtException(ctx);
}
duk_pop_2(ctx); // [readable]
duk_dup(ctx, 0);
sem_post(&(rstream->pipeLock));
return(1);
}
else
{
duk_pop(ctx);
}
if (rstream->paused != 0)
{
void *chain = Duktape_GetChain(ctx);
if (chain != NULL)
// No Active Dispatch, so while we hold this lock, we can setup/add the pipe
duk_push_heapptr(ctx, rstream->pipeArray); // [pipeArray]
duk_get_prop_string(ctx, -1, "push"); // [pipeArray][push]
duk_swap_top(ctx, -2); // [push][this]
duk_dup(ctx, 0); // [push][this][dest]
ILibDuktape_Push_ObjectStash(ctx); // [push][this][dest][stash]
duk_push_fixed_buffer(ctx, sizeof(ILibDuktape_readableStream_nextWriteablePipe)); // [push][this][dest][stash][buffer]
w = (ILibDuktape_readableStream_nextWriteablePipe*)Duktape_GetBuffer(ctx, -1, NULL);
duk_put_prop_string(ctx, -2, Duktape_GetStashKey(duk_get_heapptr(ctx, -1))); // [push][this][dest][stash]
duk_pop(ctx); // [push][this][dest]
duk_call_method(ctx, 1); duk_pop(ctx); // ...
memset(w, 0, sizeof(ILibDuktape_readableStream_nextWriteablePipe));
w->writableStream = duk_get_heapptr(ctx, 0);
if (duk_has_prop_string(ctx, 0, ILibDuktape_WritableStream_WSPTRS))
{
// We are paused, so we should yield and resume... We yield, so in case the user tries to chain multiple pipes, it will chain first
void **tmp = (void**)ILibMemory_Allocate(sizeof(void*), 0, NULL, NULL);
tmp[0] = rstream;
ILibLifeTime_AddEx(ILibGetBaseTimer(chain), tmp, 0, ILibDuktape_readableStream_pipe_resumeFromTimer, ILibDuktape_readableStream_pipe_resumeFromTimer2);
// This is one of our writable stream implementation... So we can keep everything native
duk_get_prop_string(ctx, 0, ILibDuktape_WritableStream_WSPTRS); // [wrsPTR]
w->nativeWritable = Duktape_GetBuffer(ctx, -1, NULL);
duk_pop(ctx); // ...
// If JSCreate is non-zero, it means this is actually a JS Stream, not a native one
if (((int*)w->nativeWritable)[0] != 0) { w->nativeWritable = NULL; }
}
// Now lets lets add this entry to the end of the list, so it can be dispatched without invoking into JS to access the array
if (rstream->nextWriteable == NULL)
{
rstream->nextWriteable = w;
}
else
{
// Oops
duk_push_string(ctx, "ILibParsers_Duktape *MISSING*");
duk_throw(ctx);
return(DUK_RET_ERROR);
tmp = rstream->nextWriteable;
while (tmp->next != NULL) { tmp = tmp->next; }
tmp->next = w;
w->previous = tmp;
}
}
if (nargs > 1 && duk_is_object(ctx, 1))
{
rstream->bypassValue = Duktape_GetIntPropertyValue(ctx, 1, "dataTypeSkip", 0);
rstream->noPropagateEnd = Duktape_GetBooleanProperty(ctx, 1, "end", 1) == 0 ? 1 : 0;
}
sem_post(&(rstream->pipeLock));
if (rstream->PipeHookHandler != NULL) { rstream->PipeHookHandler(rstream, rstream->user); }
// Now we need to emit a 'pipe' event on the writable that we just attached
duk_push_heapptr(ctx, w->writableStream); // [dest]
duk_get_prop_string(ctx, -1, "emit"); // [dest][emit]
duk_swap_top(ctx, -2); // [emit][this]
duk_push_string(ctx, "pipe"); // [emit][this][pipe]
duk_push_this(ctx); // [emit][this][pipe][readable]
duk_call_method(ctx, 2); duk_pop(ctx); // ...
if (rstream->paused != 0)
{
// We are paused, so we should yield and resume... We yield, so in case the user tries to chain multiple pipes, it will chain first
ILibDuktape_Immediate(ctx, (void*[]) { rstream, duk_get_heapptr(ctx, 0) }, 1, ILibDuktape_ReadableStream_pipe_ResumeLater);
}
else
{
if (rstream->PipeHookHandler != NULL) { rstream->PipeHookHandler(rstream, duk_get_heapptr(ctx, 0), rstream->user); }
}
duk_dup(ctx, 0);
return 1;
}
void ILibDuktape_readableStream_unpipe_later(duk_context *ctx, void ** args, int argsLen)
{
ILibDuktape_readableStream *data;
ILibDuktape_readableStream_nextWriteablePipe *w;
int i;
duk_size_t arrayLen;
duk_push_heapptr(ctx, args[0]); // [readable]
duk_get_prop_string(ctx, -1, ILibDuktape_readableStream_RSPTRS); // [readable][ptrs]
data = (ILibDuktape_readableStream*)Duktape_GetBuffer(ctx, -1, NULL);
duk_pop_2(ctx); // ...
sem_wait(&(data->pipeLock));
if (data->pipeInProgress != 0)
{
// We must yield, and try again, because there's an active dispatch going on
ILibDuktape_Immediate(ctx, (void*[]) { args[0], args[1] }, argsLen, ILibDuktape_readableStream_unpipe_later);
sem_post(&(data->pipeLock));
return;
}
else
{
i = 0;
w = data->nextWriteable;
if (argsLen > 1)
{
// Specific stream was specified in 'unpipe'
while (w != NULL)
{
if (w->writableStream == args[1])
{
// Emit the 'unpipe' event
duk_push_heapptr(ctx, args[1]); // [ws]
duk_get_prop_string(ctx, -1, "emit"); // [ws][emit]
duk_swap_top(ctx, -2); // [emit][this]
duk_push_string(ctx, "unpipe"); // [emit][this][unpipe]
duk_push_heapptr(ctx, args[0]); // [emit][this][unpipe][readable]
if (duk_pcall_method(ctx, 2) != 0) { ILibDuktape_Process_UncaughtExceptionEx(ctx, "readable.unpipe(): "); }
duk_pop(ctx); // ...
if (w->previous != NULL)
{
w->previous->next = w->next;
}
else
{
data->nextWriteable = w->next;
}
duk_push_heapptr(ctx, data->pipeArray); // [array]
arrayLen = duk_get_length(ctx, -1);
for (i = 0; i < (int)arrayLen; ++i)
{
duk_get_prop_index(ctx, -1, i); // [array][ws]
ILibDuktape_Push_ObjectStash(ctx); // [array][ws][stash]
if (duk_has_prop_string(ctx, -1, Duktape_GetStashKey(args[1])))
{
// Removing the entry from the Array
duk_pop_2(ctx); // [array]
duk_get_prop_string(ctx, -1, "splice"); // [array][splice]
duk_swap_top(ctx, -2); // [splice][this]
duk_push_int(ctx, i); // [splice][this][i]
duk_push_int(ctx, 1); // [splice][this][i][1]
duk_call_method(ctx, 2); // [undefined]
duk_pop(ctx); // ...
break;
}
duk_pop_2(ctx); // [array]
}
duk_pop(ctx); // ...
break;
}
w = w->next;
}
}
else
{
// 'unpipe' all pipes
w = data->nextWriteable;
while (w != NULL)
{
duk_push_heapptr(ctx, w->writableStream); // [ws]
duk_get_prop_string(ctx, -1, "emit"); // [ws][emit]
duk_swap_top(ctx, -2); // [emit][this]
duk_push_string(ctx, "unpipe"); // [emit][this][unpipe]
duk_push_heapptr(ctx, args[0]); // [emit][this][unpipe][readable]
if (duk_pcall_method(ctx, 2) != 0) { ILibDuktape_Process_UncaughtExceptionEx(ctx, "readable.unpipe(): "); }
duk_pop(ctx); // ...
w = w->next;
}
data->nextWriteable = NULL;
duk_push_heapptr(ctx, args[0]); // [readable]
duk_del_prop_string(ctx, -1, ILibDuktape_readableStream_PipeArray);
duk_push_array(ctx); // [readable][array]
data->pipeArray = duk_get_heapptr(ctx, -1);
duk_put_prop_string(ctx, -2, ILibDuktape_readableStream_PipeArray); // [readable]
duk_pop(ctx); // ...
}
}
data->unpipeInProgress = 0;
sem_post(&(data->pipeLock));
}
duk_ret_t ILibDuktape_readableStream_unpipe(duk_context *ctx)
{
int nargs = duk_get_top(ctx);
ILibDuktape_readableStream *data;
ILibDuktape_readableStream_nextWriteablePipe *w, *prev;
duk_push_this(ctx); // [stream]
duk_get_prop_string(ctx, -1, ILibDuktape_readableStream_RSPTRS); // [stream][ptrs]
duk_push_this(ctx); // [readable]
duk_get_prop_string(ctx, -1, ILibDuktape_readableStream_RSPTRS); // [readable][ptrs]
data = (ILibDuktape_readableStream*)Duktape_GetBuffer(ctx, -1, NULL);
duk_pop(ctx); // [readable]
if (nargs == 0)
{
duk_del_prop_string(ctx, -2, ILibDuktape_readableStream_WritePipes);
data->nextWriteable = NULL;
}
else if (data->nextWriteable != NULL)
{
w = data->nextWriteable;
prev = NULL;
while (w != NULL)
{
if (w->writableStream == duk_get_heapptr(ctx, 0))
{
memset(w, 0, 2 * sizeof(void*));
if (data->nextWriteable == w)
{
//printf("Unpiping object: %p\n", (void*)w);
data->nextWriteable = w->next;
break;
}
else
{
prev->next = w->next;
break;
}
}
else
{
prev = w;
w = w->next;
}
}
}
sem_wait(&(data->pipeLock));
data->unpipeInProgress = 1;
sem_post(&(data->pipeLock));
// We need to pause first
duk_push_this(ctx); // [readable]
duk_get_prop_string(ctx, -1, "pause"); // [readable][pause]
duk_dup(ctx, -2); // [readable][pause][this]
duk_call_method(ctx, 0); duk_pop(ctx); // [readable]
// We must yield, and do this on the next event loop, because we can't unpipe if we're called from a pipe'ed call
ILibDuktape_Immediate(ctx, (void*[]) { duk_get_heapptr(ctx, -1), nargs == 1 ? duk_get_heapptr(ctx, 0) : NULL }, nargs + 1, ILibDuktape_readableStream_unpipe_later);
return 0;
}
duk_ret_t ILibDuktape_readableStream_isPaused(duk_context *ctx)
@@ -710,11 +795,31 @@ void ILibDuktape_ReadableStream_PipeLockFinalizer(duk_context *ctx, void *stream
sem_destroy(&(ptrs->pipeLock));
duk_pop_2(ctx);
}
ILibDuktape_readableStream* ILibDuktape_InitReadableStream(duk_context *ctx, ILibDuktape_readableStream_PauseResumeHandler OnPause, ILibDuktape_readableStream_PauseResumeHandler OnResume, void *user)
duk_ret_t ILibDuktape_ReadableStream_unshift(duk_context *ctx)
{
ILibDuktape_readableStream *rs;
duk_push_this(ctx); // [stream]
duk_get_prop_string(ctx, -1, ILibDuktape_readableStream_RSPTRS); // [stream][ptrs]
rs = (ILibDuktape_readableStream*)Duktape_GetBuffer(ctx, -1, NULL);
if (rs->UnshiftHandler == NULL)
{
return(ILibDuktape_Error(ctx, "readable.unshift(): Not Implemented"));
}
else
{
duk_size_t bufferLen;
Duktape_GetBuffer(ctx, 0, &bufferLen);
duk_push_int(ctx, rs->UnshiftHandler(rs, (int)bufferLen, rs->user));
return(1);
}
}
ILibDuktape_readableStream* ILibDuktape_ReadableStream_InitEx(duk_context *ctx, ILibDuktape_readableStream_PauseResumeHandler OnPause, ILibDuktape_readableStream_PauseResumeHandler OnResume, ILibDuktape_readableStream_UnShiftHandler OnUnshift, void *user)
{
ILibDuktape_readableStream *retVal;
ILibDuktape_EventEmitter *emitter;
ILibDuktape_PointerValidation_Init(ctx);
duk_push_fixed_buffer(ctx, sizeof(ILibDuktape_readableStream)); // [obj][buffer]
duk_dup(ctx, -1); // [obj][buffer][buffer]
duk_put_prop_string(ctx, -3, ILibDuktape_readableStream_RSPTRS); // [obj][buffer]
@@ -722,9 +827,10 @@ ILibDuktape_readableStream* ILibDuktape_InitReadableStream(duk_context *ctx, ILi
memset(retVal, 0, sizeof(ILibDuktape_readableStream));
duk_pop(ctx); // [obj]
duk_push_external_buffer(ctx); // [obj][extBuffer]
retVal->extBuffer = duk_get_heapptr(ctx, -1);
duk_put_prop_string(ctx, -2, "_extBuffer"); // [obj]
duk_push_array(ctx); // [obj][array]
retVal->pipeArray = duk_get_heapptr(ctx, -1);
duk_put_prop_string(ctx, -2, ILibDuktape_readableStream_PipeArray); // [obj]
retVal->ctx = ctx;
retVal->chain = Duktape_GetChain(ctx);
@@ -732,6 +838,7 @@ ILibDuktape_readableStream* ILibDuktape_InitReadableStream(duk_context *ctx, ILi
retVal->user = user;
retVal->PauseHandler = OnPause;
retVal->ResumeHandler = OnResume;
retVal->UnshiftHandler = OnUnshift;
sem_init(&(retVal->pipeLock), 0, 1);
ILibDuktape_CreateIndependentFinalizer(ctx, ILibDuktape_ReadableStream_PipeLockFinalizer);
@@ -743,7 +850,8 @@ ILibDuktape_readableStream* ILibDuktape_InitReadableStream(duk_context *ctx, ILi
ILibDuktape_CreateInstanceMethod(ctx, "pause", ILibDuktape_readableStream_pause, 0);
ILibDuktape_CreateInstanceMethod(ctx, "resume", ILibDuktape_readableStream_resume, 0);
ILibDuktape_CreateEventWithGetter(ctx, "pipe", ILibDuktape_readableStream_pipe_getter);
ILibDuktape_CreateInstanceMethod(ctx, "unpipe", ILibDuktape_readableStream_unpipe, DUK_VARARGS);
ILibDuktape_CreateProperty_InstanceMethod(ctx, "unpipe", ILibDuktape_readableStream_unpipe, DUK_VARARGS);
ILibDuktape_CreateInstanceMethod(ctx, "isPaused", ILibDuktape_readableStream_isPaused, 0);
ILibDuktape_CreateInstanceMethod(ctx, "unshift", ILibDuktape_ReadableStream_unshift, 1);
return retVal;
}