1
0
mirror of https://github.com/Ylianst/MeshAgent synced 2026-01-08 03:23:27 +00:00

Major agent update.

This commit is contained in:
Ylian Saint-Hilaire
2018-09-05 11:01:17 -07:00
parent 4b5c77b4fd
commit 3c80473a94
174 changed files with 19033 additions and 3307 deletions

View File

@@ -57,6 +57,7 @@ extern void ILibWebClient_ResetWCDO(struct ILibWebClientDataObject *wcdo);
#define ILibDuktape_HTTP2PipedWritable "\xFF_HTTP2PipedWritable"
#define ILibDuktape_HTTPStream2Data "\xFF_HTTPStream2Data"
#define ILibDuktape_HTTPStream2HTTP "\xFF_HTTPStream2HTTP"
#define ILibDuktape_HTTPStream2IMSG "\xFF_HTTPStream2IMSG"
#define ILibDuktape_HTTPStream2Socket "\xFF_HTTPStream2Socket"
#define ILibDuktape_IMSG2HttpStream "\xFF_IMSG2HttpStream"
#define ILibDuktape_IMSG2Ptr "\xFF_IMSG2Ptr"
@@ -130,6 +131,7 @@ typedef struct ILibDuktape_HttpStream_ServerResponse_BufferedImplicit_State
void *ctx;
void *writeStream;
void *serverResponseObj;
void *serverResponseStream;
int endBytes;
int chunk;
size_t bufferLen;
@@ -514,6 +516,9 @@ duk_ret_t ILibDuktape_HttpStream_http_onUpgrade(duk_context *ctx)
duk_ret_t ILibDuktape_HttpStream_http_endResponseSink(duk_context *ctx)
{
duk_push_this(ctx); // [imsg]
//ILibDuktape_Log_Object(ctx, -1, "IMSG");
duk_del_prop_string(ctx, -1, ILibDuktape_IMSG2Ptr);
duk_get_prop_string(ctx, -1, ILibDuktape_IMSG2HttpStream); // [imsg][httpstream]
duk_get_prop_string(ctx, -1, ILibDuktape_HTTP2CR); // [imsg][httpstream][CR]
@@ -543,6 +548,24 @@ duk_ret_t ILibDuktape_HttpStream_http_endResponseSink(duk_context *ctx)
duk_get_prop_string(ctx, -1, "keepSocketAlive"); // [socket][imsg][httpstream][CR][Agent][keepSocketAlive]
duk_swap_top(ctx, -2); // [socket][imsg][httpstream][CR][keepSocketAlive][this]
duk_dup(ctx, -6); // [socket][imsg][httpstream][CR][keepSocketAlive][this][socket]
//printf("End Response -->\n");
//if (duk_has_prop_string(ctx, -1, ILibDuktape_Socket2HttpStream))
//{
// duk_get_prop_string(ctx, -1, ILibDuktape_Socket2HttpStream);
// printf(" [Socket: %p] => [HTTPStream: %p]\n", duk_get_heapptr(ctx, -2), duk_get_heapptr(ctx, -1));
// ILibDuktape_Log_Object(ctx, -1, "HTTPStream");
// duk_pop(ctx);
//}
//if (duk_has_prop_string(ctx, -1, ILibDuktape_SOCKET2OPTIONS))
//{
// duk_get_prop_string(ctx, -1, ILibDuktape_SOCKET2OPTIONS);
// ILibDuktape_Log_Object(ctx, -1, "OPTIONS");
// duk_pop(ctx);
//}
//ILibDuktape_Log_Object(ctx, -1, "SOCKET");
//printf("\n");
duk_call_method(ctx, 1); duk_pop(ctx); // [socket][imsg][httpstream][CR]
return(0);
}
@@ -671,6 +694,9 @@ duk_ret_t ILibDuktape_HttpStream_http_OnSocketReady(duk_context *ctx)
if (data->bodyStream != NULL) { ILibDuktape_readableStream_WriteEnd(data->bodyStream); data->bodyStream = NULL; }
duk_pop(ctx); // [socket][clientRequest][HTTPStream]
ILibDuktape_EventEmitter_DeleteForwardEvent(ctx, -1, "response");
ILibDuktape_EventEmitter_DeleteForwardEvent(ctx, -1, "continue");
// We need to change the events to propagate to the new clientRequest instead of the old one
duk_get_prop_string(ctx, -1, "removeAllListeners"); // [socket][clientRequest][HTTPStream][remove]
duk_dup(ctx, -2); // [socket][clientRequest][HTTPStream][remove][this]
@@ -684,6 +710,8 @@ duk_ret_t ILibDuktape_HttpStream_http_OnSocketReady(duk_context *ctx)
duk_dup(ctx, -2); // [socket][clientRequest][HTTPStream][remove][this]
duk_push_string(ctx, "upgrade"); // [socket][clientRequest][HTTPStream][remove][this][upgrade]
duk_call_method(ctx, 1); duk_pop(ctx); // [socket][clientRequest][HTTPStream]
duk_push_this(ctx); // [socket][clientRequest][HTTPStream][clientRequest]
duk_put_prop_string(ctx, -2, ILibDuktape_HTTP2CR); // [socket][clientRequest][HTTPStream]
@@ -996,7 +1024,20 @@ void ILibDuktape_HttpStream_http_request_transform(struct ILibDuktape_Transform
}
}
duk_ret_t ILibDuktape_ClientRequest_Finalizer(duk_context *ctx)
{
if (duk_has_prop_string(ctx, 0, ILibDuktape_CR_RequestBuffer))
{
duk_get_prop_string(ctx, 0, ILibDuktape_CR_RequestBuffer);
ILibDuktape_Http_ClientRequest_WriteData *data = (ILibDuktape_Http_ClientRequest_WriteData*)Duktape_GetBuffer(ctx, -1, NULL);
if (data->buffer != NULL)
{
free(data->buffer);
data->buffer = NULL;
}
}
return(0);
}
duk_ret_t ILibDuktape_HttpStream_http_request(duk_context *ctx)
{
char *proto;
@@ -1086,7 +1127,7 @@ duk_ret_t ILibDuktape_HttpStream_http_request(duk_context *ctx)
ILibDuktape_EventEmitter_CreateEventEx(emitter, "upgrade");
ILibDuktape_EventEmitter_CreateEventEx(emitter, "error");
ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "socket", ILibDuktape_HttpStream_http_OnSocketReady);
ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "~", ILibDuktape_ClientRequest_Finalizer);
if (nargs > 1 && duk_is_function(ctx, 1))
@@ -1465,7 +1506,7 @@ duk_ret_t ILibDuktape_HttpStream_http_server_onConnection(duk_context *ctx)
duk_get_prop_string(ctx, -1, "pipe"); // [NS][socket][pipe]
duk_dup(ctx, -2); // [NS][socket][pipe][this]
duk_eval_string(ctx, "require('http').createStream();"); // [NS][socket][pipe][this][httpStream]
duk_eval_string(ctx, "require('http').createStream();"); // [NS][socket][pipe][this][httpStream]
duk_get_prop_string(ctx, -5, ILibDuktape_NS2HttpServer); // [NS][socket][pipe][this][httpStream][httpServer]
duk_dup(ctx, -1); // [NS][socket][pipe][this][httpStream][httpServer][dup]
duk_put_prop_string(ctx, -3, ILibduktape_HttpStream2HttpServer); // [NS][socket][pipe][this][httpStream][httpServer]
@@ -1813,7 +1854,7 @@ void ILibDuktape_HttpStream_ServerResponse_WriteImplicitHeaders(void *chain, voi
{
int retVal;
ILibDuktape_HttpStream_ServerResponse_BufferedImplicit_State *state = (ILibDuktape_HttpStream_ServerResponse_BufferedImplicit_State*)user;
if (chain != NULL && !ILibDuktape_IsPointerValid(chain, state->serverResponseObj)) { free(user); }
if (!ILibMemory_CanaryOK(state->serverResponseStream)) { free(user); return; }
// We are on Microstack Thread, so we can access the JS object, and write the implicit headers
duk_push_heapptr(state->ctx, state->serverResponseObj); // [SR]
@@ -1956,6 +1997,7 @@ ILibTransport_DoneState ILibDuktape_HttpStream_ServerResponse_WriteSink(struct I
memset(tmp, 0, sizeof(ILibDuktape_HttpStream_ServerResponse_BufferedImplicit_State));
tmp->ctx = stream->ctx;
tmp->serverResponseObj = stream->obj;
tmp->serverResponseStream = stream;
tmp->writeStream = state->writeStream;
tmp->endBytes = stream->endBytes;
tmp->chunk = state->chunkSupported;
@@ -1970,6 +2012,7 @@ ILibTransport_DoneState ILibDuktape_HttpStream_ServerResponse_WriteSink(struct I
memset(buffered, 0, sizeof(ILibDuktape_HttpStream_ServerResponse_BufferedImplicit_State));
buffered->ctx = stream->ctx;
buffered->serverResponseObj = stream->obj;
buffered->serverResponseStream = stream;
buffered->writeStream = state->writeStream;
buffered->bufferLen = bufferLen;
buffered->endBytes = stream->endBytes;
@@ -2059,6 +2102,7 @@ ILibTransport_DoneState ILibDuktape_HttpStream_ServerResponse_WriteSink(struct I
data->ctx = stream->ctx;
data->endBytes = stream->endBytes;
data->serverResponseObj = stream->obj;
data->serverResponseStream = stream;
data->writeStream = state->writeStream;
data->bufferLen = bufferLen;
memcpy_s(data->buffer, bufferLen, buffer, bufferLen);
@@ -2403,8 +2447,6 @@ void ILibDuktape_HttpStream_ServerResponse_PUSH(duk_context *ctx, void* writeStr
duk_remove(ctx, -2); // [resp][http/s.serverResponse]
duk_put_prop_string(ctx, -2, ILibDuktape_OBJID); // [resp]
ILibDuktape_PointerValidation_Init(ctx);
ILibDuktape_WriteID(ctx, "http.serverResponse");
duk_push_fixed_buffer(ctx, sizeof(ILibDuktape_HttpStream_ServerResponse_State)); // [resp][state]
state = (ILibDuktape_HttpStream_ServerResponse_State*)Duktape_GetBuffer(ctx, -1, NULL);
@@ -2666,10 +2708,11 @@ int ILibDuktape_HttpStream_IncomingMessage_UnshiftBytes(ILibDuktape_readableStre
}
void ILibDuktape_HttpStream_DispatchEnd(void *chain, void *user)
{
if (ILibDuktape_IsPointerValid(chain, ((void**)user)[1]) != 0)
if(ILibMemory_CanaryOK(((void**)user)[1]))
{
duk_context *ctx = (duk_context*)((void**)user)[0];
void *heapPtr = ((void**)user)[1];
void *heapPtr = ((ILibDuktape_DuplexStream*)((void**)user)[1])->ParentObject;
((ILibDuktape_HttpStream_Data*)((void**)user)[2])->bodyStream = NULL;
duk_push_heapptr(ctx, heapPtr); // [httpStream]
duk_get_prop_string(ctx, -1, "emit"); // [httpStream][emit]
@@ -2694,7 +2737,7 @@ duk_ret_t ILibDuktape_HttpStream_OnReceive_bodyStreamFinalized(duk_context *ctx)
ILibDuktape_HttpStream_Data *data = (ILibDuktape_HttpStream_Data*)Duktape_GetPointerProperty(ctx, 0, ILibDuktape_IMSG2Ptr);
if (data != NULL)
{
if (data->endPropagated == 0) { ILibDuktape_readableStream_WriteEnd(data->bodyStream); }
if ((data->endPropagated == 0) && (data->bodyStream != NULL)) { ILibDuktape_readableStream_WriteEnd(data->bodyStream); }
data->endPropagated = 1;
data->bodyStream = NULL;
}
@@ -2773,28 +2816,38 @@ void ILibDuktape_HttpStream_OnReceive(ILibWebClient_StateObject WebStateObject,
{
if (valLen == 12 && strncasecmp(val, "100-Continue", 12) == 0)
{
duk_push_string(ctx, "checkContinue"); // [emit][this][checkContinue]
ILibDuktape_HttpStream_IncomingMessage_PUSH(ctx, header, data->DS->ParentObject); // [emit][this][checkContinue][imsg]
data->bodyStream = ILibDuktape_ReadableStream_InitEx(ctx, ILibDuktape_HttpStream_IncomingMessage_PauseSink, ILibDuktape_HttpStream_IncomingMessage_ResumeSink, ILibDuktape_HttpStream_IncomingMessage_UnshiftBytes, data);
ILibDuktape_HttpStream_ServerResponse_PUSH(ctx, data->DS->writableStream->pipedReadable, header, data->DS->ParentObject); // [emit][this][checkContinue][imsg][rsp]
duk_dup(ctx, -1); // [emit][this][checkContinue][imsg][rsp][rsp]
duk_insert(ctx, -6); // [rsp][emit][this][checkContinue][imsg][rsp]
if (duk_pcall_method(ctx, 3) != 0) { ILibDuktape_Process_UncaughtExceptionEx(ctx, "http.httpStream.onReceive->checkContinue(): "); } // [rsp][hadListener]
// Is there a listener for 'checkContinue'?
if (ILibDuktape_EventEmitter_HasListenersEx(ctx, -1, "checkContinue"))
{
duk_push_string(ctx, "checkContinue"); // [emit][this][checkContinue]
ILibDuktape_HttpStream_IncomingMessage_PUSH(ctx, header, data->DS->ParentObject); // [emit][this][checkContinue][imsg]
data->bodyStream = ILibDuktape_ReadableStream_InitEx(ctx, ILibDuktape_HttpStream_IncomingMessage_PauseSink, ILibDuktape_HttpStream_IncomingMessage_ResumeSink, ILibDuktape_HttpStream_IncomingMessage_UnshiftBytes, data);
duk_dup(ctx, -3); duk_dup(ctx, -2); // [emit][this][checkContinue][imsg][httpstream][imsg]
duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2IMSG); duk_pop(ctx); // [emit][this][checkContinue][imsg]
ILibDuktape_HttpStream_ServerResponse_PUSH(ctx, data->DS->writableStream->pipedReadable, header, data->DS->ParentObject); // [emit][this][checkContinue][imsg][rsp]
if (duk_pcall_method(ctx, 3) != 0) { ILibDuktape_Process_UncaughtExceptionEx(ctx, "http.httpStream.onReceive->checkContinue(): "); } // [rsp][hadListener]
duk_pop(ctx); // ...
}
else
{
if (!duk_get_boolean(ctx, -1))
{
// No listener, so we must immediately send '100 Continue'
duk_get_prop_string(ctx, -2, "writeContinue"); // [rsp][hadListener][writeContinue]
duk_dup(ctx, -3); // [rsp][hadListener][writeContinue][this]
duk_call_method(ctx, 0); duk_pop(ctx); // [rsp][hadListener]
}
}
duk_pop_2(ctx); // ...
}
else
{
// Nobody listening for 'checkContinue', so we need to respond with 100 Continue
ILibDuktape_HttpStream_ServerResponse_PUSH(ctx, data->DS->writableStream->pipedReadable, header, data->DS->ParentObject); // [emit][this][rsp]
duk_get_prop_string(ctx, -1, "writeContinue"); // [emit][this][rsp][writeContinue]
duk_swap_top(ctx, -2); // [emit][this][writeContinue][this]
duk_call_method(ctx, 0); duk_pop(ctx); // [emit][this]
// Since nobody was listening for 'checkContinue', need to process this as a 'request'
duk_push_string(ctx, "request"); // [emit][this][request]
ILibDuktape_HttpStream_IncomingMessage_PUSH(ctx, header, data->DS->ParentObject); // [emit][this][request][imsg]
data->bodyStream = ILibDuktape_ReadableStream_InitEx(ctx, ILibDuktape_HttpStream_IncomingMessage_PauseSink, ILibDuktape_HttpStream_IncomingMessage_ResumeSink, ILibDuktape_HttpStream_IncomingMessage_UnshiftBytes, data);
duk_dup(ctx, -3); duk_dup(ctx, -2); // [emit][this][request][imsg][httpstream][imsg]
duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2IMSG); duk_pop(ctx); // [emit][this][request][imsg]
ILibDuktape_HttpStream_ServerResponse_PUSH(ctx, data->DS->writableStream->pipedReadable, header, data->DS->ParentObject); // [emit][this][request][imsg][rsp]
if (duk_pcall_method(ctx, 3) != 0) { ILibDuktape_Process_UncaughtExceptionEx(ctx, "http.httpStream.onReceive->request(): "); }
duk_pop(ctx); // ...
}
}
}
else
@@ -2852,6 +2905,11 @@ void ILibDuktape_HttpStream_OnReceive(ILibWebClient_StateObject WebStateObject,
data->bodyStream = ILibDuktape_ReadableStream_InitEx(ctx, ILibDuktape_HttpStream_IncomingMessage_PauseSink, ILibDuktape_HttpStream_IncomingMessage_ResumeSink, ILibDuktape_HttpStream_IncomingMessage_UnshiftBytes, data);
duk_push_pointer(ctx, data);
duk_put_prop_string(ctx, -2, ILibDuktape_IMSG2Ptr);
duk_dup(ctx, -3); // [emit][this][response][imsg][httpstream]
duk_dup(ctx, -2); // [emit][this][response][imsg][httpstream][imsg]
duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2IMSG); // [emit][this][response][imsg][httpstream]
duk_pop(ctx); // [emit][this][response][imsg]
ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "~", ILibDuktape_HttpStream_OnReceive_bodyStreamFinalized);
if (duk_pcall_method(ctx, 2) != 0) { ILibDuktape_Process_UncaughtExceptionEx(ctx, "http.httpStream.onReceive->response(): "); }
duk_pop(ctx);
@@ -2874,6 +2932,8 @@ void ILibDuktape_HttpStream_OnReceive(ILibWebClient_StateObject WebStateObject,
if (ILibIsRunningOnChainThread(data->chain) != 0)
{
// We're on the Chain Thread, so we can directly emit the 'end' event
data->bodyStream = NULL;
duk_push_heapptr(ctx, data->DS->ParentObject); // [httpStream]
duk_get_prop_string(ctx, -1, "emit"); // [httpStream][emit]
duk_swap_top(ctx, -2); // [emit][this]
@@ -2884,19 +2944,19 @@ void ILibDuktape_HttpStream_OnReceive(ILibWebClient_StateObject WebStateObject,
else
{
// We're on the wrong thread to dispatch the 'end' event, so we have to context switch
void **tmp = (void**)ILibMemory_Allocate(2 * sizeof(void*), 0, NULL, NULL);
void **tmp = (void**)ILibMemory_Allocate(3 * sizeof(void*), 0, NULL, NULL);
tmp[0] = ctx;
tmp[1] = data->DS->ParentObject;
tmp[1] = data->DS;
tmp[2] = data;
ILibChain_RunOnMicrostackThread(data->chain, ILibDuktape_HttpStream_DispatchEnd, tmp);
}
}
}
duk_ret_t ILibDuktape_HttpStream_Finalizer(duk_context *ctx)
{
duk_del_prop_string(ctx, 0, ILibDuktape_HTTPStream2IMSG);
duk_get_prop_string(ctx, 0, ILibDuktape_HTTPStream2Data);
ILibDuktape_HttpStream_Data *data = (ILibDuktape_HttpStream_Data*)Duktape_GetBuffer(ctx, -1, NULL);
ILibDuktape_InValidatePointer(Duktape_GetChain(ctx), data);
ILibDuktape_InValidateHeapPointer(ctx, 0);
ILibWebClient_DestroyWebClientDataObject(data->WCDO);
return(0);
@@ -2934,9 +2994,7 @@ duk_ret_t ILibduktape_HttpStream_create(duk_context *ctx)
duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2HTTP); // [httpStream]
ILibDuktape_WriteID(ctx, "http.httpStream");
ILibDuktape_EventEmitter *emitter = ILibDuktape_EventEmitter_Create(ctx);
duk_push_fixed_buffer(ctx, sizeof(ILibDuktape_HttpStream_Data)); // [httpStream][buffer]
data = (ILibDuktape_HttpStream_Data*)Duktape_GetBuffer(ctx, -1, NULL);
memset(data, 0, sizeof(ILibDuktape_HttpStream_Data));
data = (ILibDuktape_HttpStream_Data*)Duktape_PushBuffer(ctx, sizeof(ILibDuktape_HttpStream_Data));
duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2Data); // [httpStream]
ILibDuktape_EventEmitter_CreateEventEx(emitter, "end");
@@ -2960,10 +3018,7 @@ duk_ret_t ILibduktape_HttpStream_create(duk_context *ctx)
ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "pipe", ILibDuktape_HttpStream_pipeEvent);
ILibDuktape_CreateEventWithGetter(ctx, "connectionCloseSpecified", ILibDuktape_HttpStream_connectionCloseSpecified);
ILibDuktape_CreateFinalizer(ctx, ILibDuktape_HttpStream_Finalizer);
ILibDuktape_ValidatePointer(Duktape_GetChain(ctx), data);
ILibDuktape_ValidateHeapPointer(ctx, -1);
return(1);
}
duk_ret_t ILibDuktape_HttpStream_Agent_getName(duk_context *ctx)
@@ -3113,10 +3168,14 @@ duk_ret_t ILibDuktape_HttpStream_Agent_keepSocketAlive(duk_context *ctx)
duk_get_prop_string(ctx, -1, "requests"); // [key][Agent][requests]
//ILibDuktape_Log_Object(ctx, -1, "Agent/Requests");
if (duk_has_prop_string(ctx, -1, key))
{
// Has Key, check the Array
duk_get_prop_string(ctx, -1, key); // [key][Agent][requests][Array]
//ILibDuktape_Log_Object(ctx, -1, "Agent/Request/ArrayIndex");
duk_get_prop_string(ctx, -1, "shift"); // [key][Agent][requests][Array][shift]
duk_swap_top(ctx, -2); // [key][Agent][requests][shift][this]
duk_call_method(ctx, 0); // [key][Agent][requests][request]
@@ -3485,8 +3544,11 @@ ILibTransport_DoneState ILibDuktape_httpStream_webSocket_EncodedWriteSink(ILibDu
char* maskingKey = NULL;
int FIN;
unsigned char OPCODE;
unsigned char RSV;
ILibDuktape_WebSocket_State *state = (ILibDuktape_WebSocket_State*)user;
if (!ILibMemory_CanaryOK(state)) { return(ILibTransport_DoneState_ERROR); }
if (bufferLen < 2)
{
// We need at least 2 bytes to read enough of the headers to know how long the frame is
@@ -3496,6 +3558,14 @@ ILibTransport_DoneState ILibDuktape_httpStream_webSocket_EncodedWriteSink(ILibDu
hdr = ntohs(((unsigned short*)(buffer))[0]);
FIN = (hdr & WEBSOCKET_FIN) != 0;
OPCODE = (hdr & WEBSOCKET_OPCODE) >> 8;
RSV = (hdr & WEBSOCKET_RSV) >> 8;
if (RSV != 0)
{
char msg[] = "Reserved Field of Websocket was not ZERO";
Duktape_Console_Log(state->ctx, state->chain, ILibDuktape_LogType_Error, msg, sizeof(msg) - 1);
return(ILibTransport_DoneState_ERROR);
}
plen = (unsigned char)(hdr & WEBSOCKET_PLEN);
if (plen == 126)
@@ -3636,7 +3706,8 @@ void ILibDuktape_httpStream_webSocket_EncodedEndSink(ILibDuktape_DuplexStream *s
}
void ILibDuktape_httpStream_webSocket_EncodedPauseSink_Chain(void *chain, void *user)
{
if (chain != NULL && !ILibDuktape_IsPointerValid(chain, user)) { return; }
if (!ILibMemory_CanaryOK(user)) { return; }
ILibDuktape_WebSocket_State *state = (ILibDuktape_WebSocket_State*)user;
duk_context *ctx = state->decodedStream->writableStream->ctx;
@@ -3648,7 +3719,7 @@ void ILibDuktape_httpStream_webSocket_EncodedPauseSink_Chain(void *chain, void *
}
void ILibDuktape_httpStream_webSocket_EncodedPauseSink(ILibDuktape_DuplexStream *sender, void *user)
{
printf("WebSocket.Encoded.Pause();\n");
//printf("WebSocket.Encoded.Pause();\n");
ILibDuktape_WebSocket_State *state = (ILibDuktape_WebSocket_State*)user;
if (state->decodedStream->writableStream->pipedReadable_native != NULL && state->decodedStream->writableStream->pipedReadable_native->PauseHandler != NULL)
{
@@ -3669,10 +3740,12 @@ void ILibDuktape_httpStream_webSocket_EncodedPauseSink(ILibDuktape_DuplexStream
}
void ILibDuktape_httpStream_webSocket_EncodedResumeSink_Chain(void *chain, void *user)
{
if (chain != NULL && !ILibDuktape_IsPointerValid(chain, user)) { return; }
if (!ILibMemory_CanaryOK(user)) { return; }
ILibDuktape_WebSocket_State *state = (ILibDuktape_WebSocket_State*)user;
duk_context *ctx = state->decodedStream->writableStream->ctx;
if (state->decodedStream->writableStream->pipedReadable == NULL) { return; }
duk_push_heapptr(ctx, state->decodedStream->writableStream->pipedReadable); // [readable]
duk_get_prop_string(ctx, -1, "resume"); // [readable][resume]
duk_swap_top(ctx, -2); // [resume][this]
@@ -3681,7 +3754,7 @@ void ILibDuktape_httpStream_webSocket_EncodedResumeSink_Chain(void *chain, void
}
void ILibDuktape_httpStream_webSocket_EncodedResumeSink(ILibDuktape_DuplexStream *sender, void *user)
{
printf("WebSocket.Encoded.Resume();\n");
//printf("WebSocket.Encoded.Resume();\n");
ILibDuktape_WebSocket_State *state = (ILibDuktape_WebSocket_State*)user;
if (state->decodedStream->writableStream->pipedReadable_native != NULL && state->decodedStream->writableStream->pipedReadable_native->ResumeHandler != NULL)
{
@@ -3718,7 +3791,8 @@ void ILibDuktape_httpStream_webSocket_DecodedEndSink(ILibDuktape_DuplexStream *s
}
void ILibDuktape_httpStream_webSocket_DecodedPauseSink_Chain(void *chain, void *user)
{
if (chain != NULL && !ILibDuktape_IsPointerValid(chain, user)) { return; }
if (!ILibMemory_CanaryOK(user)) { return; }
ILibDuktape_WebSocket_State *state = (ILibDuktape_WebSocket_State*)user;
duk_context *ctx = state->encodedStream->writableStream->ctx;
@@ -3758,7 +3832,7 @@ void ILibDuktape_httpStream_webSocket_DecodedPauseSink(ILibDuktape_DuplexStream
}
void ILibDuktape_httpStream_webSocket_DecodedResumeSink_Chain(void *chain, void *user)
{
if (chain != NULL && !ILibDuktape_IsPointerValid(chain, user)) { return; }
if (!ILibMemory_CanaryOK(user)) { return; }
ILibDuktape_WebSocket_State *state = (ILibDuktape_WebSocket_State*)user;
duk_context *ctx = state->encodedStream->writableStream->ctx;
@@ -3802,11 +3876,10 @@ int ILibDuktape_httpStream_webSocket_DecodedUnshiftSink(ILibDuktape_DuplexStream
duk_ret_t ILibDuktape_httpStream_webSocketStream_finalizer(duk_context *ctx)
{
void *chain = Duktape_GetChain(ctx);
duk_get_prop_string(ctx, 0, ILibDuktape_WebSocket_StatePtr);
ILibDuktape_WebSocket_State *state = (ILibDuktape_WebSocket_State*)Duktape_GetBuffer(ctx, -1, NULL);
if (state->encodedStream->writableStream->pipedReadable != NULL)
if (state->encodedStream != NULL && state->encodedStream->writableStream->pipedReadable != NULL)
{
duk_push_heapptr(ctx, state->encodedStream->writableStream->pipedReadable); // [readable]
duk_get_prop_string(ctx, -1, "unpipe"); // [readable][unpipe]
@@ -3815,7 +3888,6 @@ duk_ret_t ILibDuktape_httpStream_webSocketStream_finalizer(duk_context *ctx)
duk_call_method(ctx, 1); duk_pop(ctx); // ...
}
ILibDuktape_InValidatePointer(chain, Duktape_GetBuffer(ctx, -1, NULL));
return(0);
}
duk_ret_t ILibDuktape_httpStream_webSocketStream_sendPing(duk_context *ctx)
@@ -3856,17 +3928,24 @@ duk_ret_t ILibDuktape_httpStream_webSocketStream_encodedPiped(duk_context *ctx)
}
return(0);
}
duk_ret_t ILibDuktape_httpStream_webSocketStream_encoded_Finalizer(duk_context *ctx)
{
duk_get_prop_string(ctx, 0, ILibDuktape_WSENC2WS);
duk_get_prop_string(ctx, -1, ILibDuktape_WebSocket_StatePtr);
ILibDuktape_WebSocket_State *state = (ILibDuktape_WebSocket_State*)Duktape_GetBuffer(ctx, -1, NULL);
state->encodedStream = NULL;
return(0);
}
duk_ret_t ILibDuktape_httpStream_webSocketStream_new(duk_context *ctx)
{
ILibDuktape_WebSocket_State *state;
duk_push_object(ctx); // [WebSocket]
duk_push_object(ctx); // [WebSocket]
ILibDuktape_WriteID(ctx, "http.WebSocketStream");
duk_push_fixed_buffer(ctx, sizeof(ILibDuktape_WebSocket_State)); // [WebSocket][data]
state = (ILibDuktape_WebSocket_State*)Duktape_GetBuffer(ctx, -1, NULL);
memset(state, 0, sizeof(ILibDuktape_WebSocket_State));
duk_put_prop_string(ctx, -2, ILibDuktape_WebSocket_StatePtr); // [WebSocket]
ILibDuktape_ValidatePointer(Duktape_GetChain(ctx), state);
state = (ILibDuktape_WebSocket_State*)Duktape_PushBuffer(ctx, sizeof(ILibDuktape_WebSocket_State)); // [WebSocket][data]
duk_put_prop_string(ctx, -2, ILibDuktape_WebSocket_StatePtr); // [WebSocket]
state->ctx = ctx;
state->ObjectPtr = duk_get_heapptr(ctx, -1);
state->chain = Duktape_GetChain(ctx);
@@ -3877,6 +3956,7 @@ duk_ret_t ILibDuktape_httpStream_webSocketStream_new(duk_context *ctx)
ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "pipe", ILibDuktape_httpStream_webSocketStream_encodedPiped);
duk_dup(ctx, -2); // [WebSocket][Encoded][WebSocket]
duk_put_prop_string(ctx, -2, ILibDuktape_WSENC2WS); // [WebSocket][Encoded]
ILibDuktape_EventEmitter_AddOnceEx3(ctx, -1, "~", ILibDuktape_httpStream_webSocketStream_encoded_Finalizer);
ILibDuktape_CreateReadonlyProperty(ctx, "encoded"); // [WebSocket]
duk_push_object(ctx); // [WebSocket][Decoded]
@@ -3925,6 +4005,3 @@ void ILibDuktape_HttpStream_Init(duk_context *ctx)
ILibDuktape_ModSearch_AddHandler(ctx, "http", ILibDuktape_HttpStream_http_PUSH);
ILibDuktape_ModSearch_AddHandler(ctx, "https", ILibDuktape_HttpStream_https_PUSH);
}