mirror of
https://github.com/Ylianst/MeshAgent
synced 2025-12-11 05:43:33 +00:00
MeshAgent for MeshCentral2 Beta2 with improved crypto.
This commit is contained in:
@@ -54,15 +54,17 @@ public:
|
||||
\brief The ReadableStream.pipe() method attaches a WritableStream to the readable, causing it to switch automatically into flowing mode and push all of its data to the attached WritableStream.
|
||||
*
|
||||
* Flow control of data will be automatically managed so that the destination WritableStream is not overwhelmed by a faster ReadableStream.
|
||||
\param destination[in] The WritableStream to attach to the ReadableStream.
|
||||
\param destination \<WritableStream\> The WritableStream to attach to the ReadableStream.
|
||||
\param options <Object> Optional parameters:\n
|
||||
<b>dataTypeSkip</b> If set to 1, String values will only emit 'data' events instead of being piped to the WritableStream
|
||||
*/
|
||||
void pipe(WritableStream destination);
|
||||
void pipe(destination[, options]);
|
||||
/*!
|
||||
\brief The ReadableStream.unpipe() method detaches a WritableStream previously attached using the ReadableStream.pipe() method.
|
||||
*
|
||||
\param destination[in] If specified, the WritableStream to detach. If not specified, all streams will be dettached.
|
||||
\param destination \<WritableStream\> If specified, the WritableStream to detach. If not specified, all streams will be dettached.
|
||||
*/
|
||||
void unpipe(WritableStream destination);
|
||||
void unpipe(destination);
|
||||
|
||||
|
||||
/*!
|
||||
@@ -98,12 +100,14 @@ typedef struct ILibDuktape_readableStream_bufferedData
|
||||
{
|
||||
struct ILibDuktape_readableStream_bufferedData *Next;
|
||||
int bufferLen;
|
||||
int Reserved;
|
||||
char buffer[];
|
||||
}ILibDuktape_readableStream_bufferedData;
|
||||
|
||||
void ILibDuktape_readableStream_WriteData_buffer(ILibDuktape_readableStream *stream, char *buffer, int bufferLen)
|
||||
void ILibDuktape_readableStream_WriteData_buffer(ILibDuktape_readableStream *stream, int streamReserved, char *buffer, int bufferLen)
|
||||
{
|
||||
ILibDuktape_readableStream_bufferedData *buffered = (ILibDuktape_readableStream_bufferedData*)ILibMemory_Allocate(bufferLen + sizeof(ILibDuktape_readableStream_bufferedData), 0, NULL, NULL);
|
||||
buffered->Reserved = streamReserved;
|
||||
buffered->bufferLen = bufferLen;
|
||||
memcpy_s(buffered->buffer, bufferLen, buffer, bufferLen);
|
||||
|
||||
@@ -134,8 +138,15 @@ void ILibDuktape_readableStream_WriteData_OnData_ChainThread(void *chain, void *
|
||||
stream->paused = 0;
|
||||
duk_push_heapptr(stream->ctx, stream->OnData); // [func]
|
||||
duk_push_heapptr(stream->ctx, stream->object); // [func][this]
|
||||
duk_push_heapptr(stream->ctx, stream->extBuffer); // [func][this][buffer]
|
||||
duk_config_buffer(stream->ctx, -1, stream->extBuffer_buffer, stream->extBuffer_bufferLen);
|
||||
if (stream->extBuffer_Reserved == 0)
|
||||
{
|
||||
duk_push_heapptr(stream->ctx, stream->extBuffer); // [func][this][buffer]
|
||||
duk_config_buffer(stream->ctx, -1, stream->extBuffer_buffer, stream->extBuffer_bufferLen);
|
||||
}
|
||||
else
|
||||
{
|
||||
duk_push_lstring(stream->ctx, stream->extBuffer_buffer, stream->extBuffer_bufferLen);
|
||||
}
|
||||
if (duk_pcall_method(stream->ctx, 1) != 0) // [retVal]
|
||||
{
|
||||
ILibDuktape_Process_UncaughtException(stream->ctx);
|
||||
@@ -256,7 +267,7 @@ void ILibDuktape_readableStream_WriteData_ChainThread(void *chain, void *user)
|
||||
}
|
||||
}
|
||||
|
||||
int ILibDuktape_readableStream_WriteData(ILibDuktape_readableStream *stream, char* buffer, int bufferLen)
|
||||
int ILibDuktape_readableStream_WriteDataEx(ILibDuktape_readableStream *stream, int streamReserved, char* buffer, int bufferLen)
|
||||
{
|
||||
ILibDuktape_readableStream_nextWriteablePipe *w;
|
||||
int nonNativeCount = 0;
|
||||
@@ -264,42 +275,46 @@ int ILibDuktape_readableStream_WriteData(ILibDuktape_readableStream *stream, cha
|
||||
|
||||
if (stream->paused != 0)
|
||||
{
|
||||
ILibDuktape_readableStream_WriteData_buffer(stream, buffer, bufferLen);
|
||||
ILibDuktape_readableStream_WriteData_buffer(stream, streamReserved, buffer, bufferLen);
|
||||
return(stream->paused);
|
||||
}
|
||||
|
||||
sem_wait(&(stream->pipeLock));
|
||||
w = stream->nextWriteable;
|
||||
while (w != NULL)
|
||||
if (stream->bypassValue == 0 || stream->bypassValue != streamReserved)
|
||||
{
|
||||
if (w->nativeWritable == 0) { ++nonNativeCount; }
|
||||
else { ++nativeCount; }
|
||||
w = w->next;
|
||||
}
|
||||
w = stream->nextWriteable;
|
||||
if (w != NULL)
|
||||
{
|
||||
if (nonNativeCount > 0)
|
||||
sem_wait(&(stream->pipeLock));
|
||||
w = stream->nextWriteable;
|
||||
while (w != NULL)
|
||||
{
|
||||
// There are piped Pure JavaScript objects... We must context switch to Microstack Thread
|
||||
stream->extBuffer_buffer = buffer;
|
||||
stream->extBuffer_bufferLen = bufferLen;
|
||||
sem_post(&(stream->pipeLock));
|
||||
if (stream->PauseHandler != NULL) { stream->paused = 1; stream->PauseHandler(stream, stream->user); }
|
||||
ILibChain_RunOnMicrostackThread(stream->chain, ILibDuktape_readableStream_WriteData_ChainThread, stream);
|
||||
return(stream->paused);
|
||||
if (w->nativeWritable == 0) { ++nonNativeCount; }
|
||||
else { ++nativeCount; }
|
||||
w = w->next;
|
||||
}
|
||||
else
|
||||
w = stream->nextWriteable;
|
||||
if (w != NULL)
|
||||
{
|
||||
// All piped objects are native, so we can blast out a send
|
||||
stream->pipe_pendingCount = nativeCount;
|
||||
while (w != NULL)
|
||||
if (nonNativeCount > 0)
|
||||
{
|
||||
if (w->nativeWritable != NULL)
|
||||
// There are piped Pure JavaScript objects... We must context switch to Microstack Thread
|
||||
stream->extBuffer_Reserved = streamReserved;
|
||||
stream->extBuffer_buffer = buffer;
|
||||
stream->extBuffer_bufferLen = bufferLen;
|
||||
sem_post(&(stream->pipeLock));
|
||||
if (stream->PauseHandler != NULL) { stream->paused = 1; stream->PauseHandler(stream, stream->user); }
|
||||
ILibChain_RunOnMicrostackThread(stream->chain, ILibDuktape_readableStream_WriteData_ChainThread, stream);
|
||||
return(stream->paused);
|
||||
}
|
||||
else
|
||||
{
|
||||
// All piped objects are native, so we can blast out a send
|
||||
stream->pipe_pendingCount = nativeCount;
|
||||
while (w != NULL)
|
||||
{
|
||||
ILibDuktape_WritableStream *ws = (ILibDuktape_WritableStream*)w->nativeWritable;
|
||||
switch (ws->WriteSink(ws, buffer, bufferLen, ws->WriteSink_User))
|
||||
if (w->nativeWritable != NULL)
|
||||
{
|
||||
ILibDuktape_WritableStream *ws = (ILibDuktape_WritableStream*)w->nativeWritable;
|
||||
ws->Reserved = streamReserved;
|
||||
switch (ws->WriteSink(ws, buffer, bufferLen, ws->WriteSink_User))
|
||||
{
|
||||
case ILibTransport_DoneState_INCOMPLETE:
|
||||
ws->OnWriteFlushEx = ILibDuktape_readableStream_WriteData_Flush;
|
||||
ws->OnWriteFlushEx_User = stream;
|
||||
@@ -324,28 +339,28 @@ int ILibDuktape_readableStream_WriteData(ILibDuktape_readableStream *stream, cha
|
||||
--stream->pipe_pendingCount;
|
||||
#endif
|
||||
break;
|
||||
}
|
||||
}
|
||||
w = w->next;
|
||||
}
|
||||
if (stream->pipe_pendingCount == 0)
|
||||
{
|
||||
sem_post(&(stream->pipeLock));
|
||||
return(stream->paused);
|
||||
}
|
||||
else
|
||||
{
|
||||
sem_post(&(stream->pipeLock));
|
||||
if (stream->PauseHandler != NULL) { stream->paused = 1; stream->PauseHandler(stream, stream->user); }
|
||||
return(stream->paused);
|
||||
}
|
||||
w = w->next;
|
||||
}
|
||||
if (stream->pipe_pendingCount == 0)
|
||||
{
|
||||
sem_post(&(stream->pipeLock));
|
||||
return(stream->paused);
|
||||
}
|
||||
else
|
||||
{
|
||||
sem_post(&(stream->pipeLock));
|
||||
if (stream->PauseHandler != NULL) { stream->paused = 1; stream->PauseHandler(stream, stream->user); }
|
||||
return(stream->paused);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
sem_post(&(stream->pipeLock));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
sem_post(&(stream->pipeLock));
|
||||
}
|
||||
|
||||
|
||||
if (stream->OnData != NULL)
|
||||
{
|
||||
@@ -353,8 +368,15 @@ int ILibDuktape_readableStream_WriteData(ILibDuktape_readableStream *stream, cha
|
||||
{
|
||||
duk_push_heapptr(stream->ctx, stream->OnData); // [func]
|
||||
duk_push_heapptr(stream->ctx, stream->object); // [func][this]
|
||||
duk_push_heapptr(stream->ctx, stream->extBuffer); // [func][this][buffer]
|
||||
duk_config_buffer(stream->ctx, -1, buffer, bufferLen);
|
||||
if (streamReserved == 0)
|
||||
{
|
||||
duk_push_heapptr(stream->ctx, stream->extBuffer); // [func][this][buffer]
|
||||
duk_config_buffer(stream->ctx, -1, buffer, bufferLen);
|
||||
}
|
||||
else
|
||||
{
|
||||
duk_push_lstring(stream->ctx, buffer, bufferLen); // [func][this][string]
|
||||
}
|
||||
if (duk_pcall_method(stream->ctx, 1) != 0) // [retVal]
|
||||
{
|
||||
ILibDuktape_Process_UncaughtException(stream->ctx);
|
||||
@@ -365,6 +387,7 @@ int ILibDuktape_readableStream_WriteData(ILibDuktape_readableStream *stream, cha
|
||||
{
|
||||
// Need to PAUSE, and context switch to Chain Thread, so we can dispatch into JavaScript
|
||||
if (stream->paused == 0 && stream->PauseHandler != NULL) { stream->paused = 1; stream->PauseHandler(stream, stream->user); }
|
||||
stream->extBuffer_Reserved = streamReserved;
|
||||
stream->extBuffer_buffer = buffer;
|
||||
stream->extBuffer_bufferLen = bufferLen;
|
||||
ILibChain_RunOnMicrostackThread(stream->chain, ILibDuktape_readableStream_WriteData_OnData_ChainThread, stream);
|
||||
@@ -375,7 +398,7 @@ int ILibDuktape_readableStream_WriteData(ILibDuktape_readableStream *stream, cha
|
||||
// If we get here, it means we are writing data, but nobody is going to be receiving it...
|
||||
// So we need to buffer the data, so when we are resumed later, we can retry
|
||||
|
||||
ILibDuktape_readableStream_WriteData_buffer(stream, buffer, bufferLen);
|
||||
ILibDuktape_readableStream_WriteData_buffer(stream, streamReserved, buffer, bufferLen);
|
||||
}
|
||||
else if (stream->OnEnd != NULL)
|
||||
{
|
||||
@@ -477,8 +500,7 @@ int ILibDuktape_readableStream_resume_flush(ILibDuktape_readableStream *rs)
|
||||
while ((buffered = rs->paused_data))
|
||||
{
|
||||
rs->paused_data = buffered->Next;
|
||||
|
||||
if (ILibDuktape_readableStream_WriteData(rs, buffered->buffer, buffered->bufferLen) != 0)
|
||||
if (ILibDuktape_readableStream_WriteDataEx(rs, buffered->Reserved, buffered->buffer, buffered->bufferLen) != 0)
|
||||
{
|
||||
// Send did not complete, so lets exit out, and we'll continue next time.
|
||||
free(buffered);
|
||||
@@ -515,12 +537,18 @@ duk_ret_t ILibDuktape_readableStream_pipe(duk_context *ctx)
|
||||
{
|
||||
ILibDuktape_readableStream *rstream;
|
||||
ILibDuktape_readableStream_nextWriteablePipe *w;
|
||||
int nargs = duk_get_top(ctx);
|
||||
|
||||
duk_push_this(ctx); // [readable]
|
||||
duk_get_prop_string(ctx, -1, ILibDuktape_readableStream_RSPTRS); // [readable][ptrs]
|
||||
rstream = (ILibDuktape_readableStream*)Duktape_GetBuffer(ctx, -1, NULL);
|
||||
duk_pop(ctx); // [readable]
|
||||
|
||||
if (nargs > 1 && duk_is_object(ctx, 1))
|
||||
{
|
||||
rstream->bypassValue = Duktape_GetIntPropertyValue(ctx, 1, "dataTypeSkip", 0);
|
||||
}
|
||||
|
||||
duk_push_object(ctx); // [readable][nextWriteable]
|
||||
duk_push_fixed_buffer(ctx, sizeof(ILibDuktape_readableStream_nextWriteablePipe)); // [readable][nextWriteable][ptrBuffer]
|
||||
w = (ILibDuktape_readableStream_nextWriteablePipe*)Duktape_GetBuffer(ctx, -1, NULL);
|
||||
|
||||
Reference in New Issue
Block a user