1
0
mirror of https://github.com/Ylianst/MeshAgent synced 2026-01-14 22:43:27 +00:00
This commit is contained in:
Bryan Roe
2021-02-23 15:04:44 -08:00
parent 38f4244bd1
commit 70dedb6ce8

View File

@@ -7,7 +7,7 @@ You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
Unless required by applicable law or agreed to in writing, software2
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
@@ -59,14 +59,13 @@ extern int ILibDeflate(char *buffer, size_t bufferLen, char *compressed, size_t
#define ILibDuktape_HTTP2PipedWritable "\xFF_HTTP2PipedWritable"
#define ILibDuktape_HTTPStream2Data "\xFF_HTTPStream2Data"
#define ILibDuktape_HTTPStream2HTTP "\xFF_HTTPStream2HTTP"
#define ILibDuktape_HTTPStream2IMSG "\xFF_HTTPStream2IMSG"
//#define ILibDuktape_HTTPStream2IMSG "\xFF_HTTPStream2IMSG"
#define ILibDuktape_HTTPStream2Socket "\xFF_HTTPStream2Socket"
#define ILibDuktape_IMSG2HttpStream "\xFF_IMSG2HttpStream"
#define ILibDuktape_IMSG2Ptr "\xFF_IMSG2Ptr"
#define ILibDuktape_IMSG2SR "\xFF_IMSG2ServerResponse"
#define ILibDuktape_NS2HttpServer "\xFF_Http_NetServer2HttpServer"
#define ILibDuktape_Options2ClientRequest "\xFF_Options2ClientRequest"
#define ILibDuktape_PipedReadable "\xFF_PipedReadable"
#define ILibDuktape_Socket2AgentStash "\xFF_Socket2AgentStash"
#define ILibDuktape_Socket2Agent "\xFF_Socket2Agent"
#define ILibDuktape_Socket2AgentKey "\xFF_Socket2AgentKey"
@@ -648,16 +647,29 @@ duk_ret_t ILibDuktape_HttpStream_http_endResponseSink(duk_context *ctx)
duk_get_prop_string(ctx, -1, "socket"); // [imsg][httpstream][CR][socket]
duk_insert(ctx, -4); // [socket][imsg][httpstream][CR]
duk_push_undefined(ctx); // [socket][imsg][httpstream][CR][undefined]
ILibDuktape_CreateReadonlyProperty(ctx, "socket"); // [socket][imsg][httpstream][CR]
ILibDuktape_DeleteReadOnlyProperty(ctx, -1, "socket");
ILibDuktape_DeleteReadOnlyProperty(ctx, -2, ILibDuktape_HTTPStream2Socket);
ILibDuktape_DeleteReadOnlyProperty(ctx, -4, ILibDuktape_Socket2HttpStream);
duk_del_prop_string(ctx, -2, ILibDuktape_HTTP2PipedReadable);
duk_prepare_method_call(ctx, -1, "removeAllListeners"); // [socket][imsg][httpstream][CR][remove][this]
duk_pcall_method(ctx, 0); duk_pop(ctx); // [socket][imsg][httpstream][CR]
ILibDuktape_DisplayProperties(ctx, -1);
if (Duktape_GetBooleanProperty(ctx, -2, "connectionCloseSpecified", 0) != 0)
{
// We cant persist this connection, so close the socket.
// Agent is already listening for the 'close' event, so it'll cleanup automatically
duk_dup(ctx, -4); // [socket][imsg][httpstream][CR][socket]
ILibDuktape_DisplayProperties(ctx, -1);
duk_get_prop_string(ctx, -1, "end"); // [socket][imsg][httpstream][CR][socket][end]
duk_swap_top(ctx, -2); // [socket][imsg][httpstream][CR][end][this]
duk_call_method(ctx, 0);
duk_call_method(ctx, 0); duk_pop(ctx); // [socket][imsg][httpstream][CR]
return(0);
}
duk_get_prop_string(ctx, -1, ILibDuktape_CR2Agent); // [socket][imsg][httpstream][CR][Agent]
@@ -772,6 +784,9 @@ duk_ret_t ILibDuktape_HttpStream_http_OnSocketClosed(duk_context *ctx)
duk_push_this(ctx); // [socket]
if (duk_has_prop_string(ctx, -1, ILibDuktape_Socket2HttpStream))
{
duk_get_prop_string(ctx, -1, ILibDuktape_Socket2HttpStream);
printf(" REMOVING reference to HttpStream: %s\n", Duktape_GetStringPropertyValue(ctx, -1, ILibDuktape_EventEmitter_FinalizerDebugMessage, ""));
duk_pop(ctx);
duk_del_prop_string(ctx, -1, ILibDuktape_Socket2HttpStream);
}
duk_pop(ctx); // ...
@@ -783,6 +798,7 @@ duk_ret_t ILibDuktape_HttpStream_http_OnSocketReady(duk_context *ctx)
void *httpStream;
duk_dup(ctx, 0); // [socket]
duk_push_string(ctx, "client"); duk_put_prop_string(ctx, -2, ILibDuktape_EventEmitter_FinalizerDebugMessage);
duk_push_c_function(ctx, ILibDuktape_HttpStream_http_SocketDiedPrematurely, DUK_VARARGS);
duk_put_prop_string(ctx, -2, ILibDuktape_Socket2DiedListener); // [socket]
duk_push_this(ctx); // [socket][clientRequest]
@@ -850,8 +866,9 @@ duk_ret_t ILibDuktape_HttpStream_http_OnSocketReady(duk_context *ctx)
ILibDuktape_EventEmitter_ForwardEvent(ctx, -1, "response", -2, "response");
ILibDuktape_EventEmitter_ForwardEvent(ctx, -1, "continue", -2, "continue");
ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "upgrade", ILibDuktape_HttpStream_http_onUpgrade);
ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "response", ILibDuktape_HttpStream_http_responseSink);
ILibDuktape_EventEmitter_AddOn_Infrastructure(ctx, -1, "upgrade", ILibDuktape_HttpStream_http_onUpgrade);
ILibDuktape_EventEmitter_AddOn_Infrastructure(ctx, -1, "response", ILibDuktape_HttpStream_http_responseSink);
duk_get_prop_string(ctx, -1, ILibDuktape_HTTP2PipedWritable); // [socket][clientRequest][HTTPStream][destination]
@@ -880,6 +897,7 @@ duk_ret_t ILibDuktape_HttpStream_http_OnSocketReady(duk_context *ctx)
}
httpStream = duk_get_heapptr(ctx, -1); // [socket][clientRequest][httpStream]
duk_push_string(ctx, "client"); duk_put_prop_string(ctx, -2, ILibDuktape_EventEmitter_FinalizerDebugMessage);
duk_dup(ctx, -3); // [socket][clientRequest][httpStream][socket]
duk_dup(ctx, -2); // [socket][clientRequest][httpStream][socket][httpStream]
if (strcmp(Duktape_GetStringPropertyValue(ctx, -2, ILibDuktape_OBJID, "net.socket"), "tls.socket") == 0) { ILibDuktape_WriteID(ctx, "https.httpStream"); }
@@ -887,10 +905,10 @@ duk_ret_t ILibDuktape_HttpStream_http_OnSocketReady(duk_context *ctx)
duk_pop(ctx); // [socket][clientRequest][httpStream]
duk_dup(ctx, -2); // [socket][clientRequest][httpStream][clientRequest]
duk_put_prop_string(ctx, -2, ILibDuktape_HTTP2CR); // [socket][clientRequest][httpStream]
ILibDuktape_EventEmitter_ForwardEvent(ctx, -1, "response", -2, "response");
ILibDuktape_EventEmitter_ForwardEventEx(ctx, -1, -2, "response");
ILibDuktape_EventEmitter_ForwardEvent(ctx, -1, "continue", -2, "continue");
ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "upgrade", ILibDuktape_HttpStream_http_onUpgrade);
ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "response", ILibDuktape_HttpStream_http_responseSink);
ILibDuktape_EventEmitter_AddOn_Infrastructure(ctx, -1, "upgrade", ILibDuktape_HttpStream_http_onUpgrade);
ILibDuktape_EventEmitter_AddOn_Infrastructure(ctx, -1, "response", ILibDuktape_HttpStream_http_responseSink);
ILibDuktape_EventEmitter_AddOnceEx3(ctx, -1, "write", ILibDuktape_HttpStream_http_SocketResponseReceived);
ILibDuktape_EventEmitter_AddOnceEx3(ctx, -3, "close", ILibDuktape_HttpStream_http_OnSocketClosed); // We need to detach HttpStream when socket closes
@@ -933,6 +951,7 @@ duk_ret_t ILibDuktape_HttpStream_http_OnSocketReady(duk_context *ctx)
duk_ret_t ILibDuktape_HttpStream_http_OnConnectError(duk_context *ctx)
{
duk_push_this(ctx); // [socket]
if (duk_has_prop_string(ctx, -1, ILibDuktape_Socket2CR))
{
// Socket was created via 'createConnection' specified by the application
@@ -1028,7 +1047,53 @@ duk_ret_t ILibDuktape_HttpStream_http_OnConnectError(duk_context *ctx)
// }
// return(0);
//}
duk_ret_t ILibDuktape_Agent_findConnection(duk_context *ctx)
{
duk_push_current_function(ctx); // [func]
duk_get_prop_string(ctx, -1, "socket"); // [func][socket]
duk_dup(ctx, 0); // [func][socket][socket]
if (duk_equals(ctx, -2, -1))
{
duk_push_true(ctx);
}
else
{
duk_push_false(ctx);
}
return(1);
}
duk_ret_t ILibDuktape_Agent_connectionEnded(duk_context *ctx)
{
duk_push_this(ctx); // [socket]
duk_prepare_method_call(ctx, -1, "removeAllListeners"); // [socket][remove][this]
duk_call_method(ctx, 0); duk_pop(ctx); // [socket]
ILibDuktape_GetReferenceCount(ctx, -1);
char *key = (char*)Duktape_GetStringPropertyValue(ctx, -1, ILibDuktape_Socket2AgentKey, NULL);
duk_get_prop_string(ctx, -1, ILibDuktape_Socket2Agent); // [socket][agent]
duk_get_prop_string(ctx, -1, "sockets"); // [socket][agent][table]
duk_get_prop_string(ctx, -1, key); // [socket][agent][table][array]
if (duk_is_array(ctx, -1))
{
duk_push_c_function(ctx, ILibDuktape_Agent_findConnection, DUK_VARARGS); //array][func]
duk_prepare_method_call(ctx, -2, "findIndex"); // [socket][agent][table][array][func][findIndex][this]
duk_dup(ctx, -3); // [socket][agent][table][array][func][findIndex][this][func]
duk_push_this(ctx); // [socket][agent][table][array][func][findIndex][this][func][socket]
duk_put_prop_string(ctx, -2, "socket"); // [socket][agent][table][array][func][findIndex][this][func]
duk_call_method(ctx, 1); // [socket][agent][table][array][func][int]
duk_del_prop_string(ctx, -2, "socket");
if (duk_get_int(ctx, -1) >= 0)
{
duk_prepare_method_call(ctx, -3, "splice"); // [socket][agent][table][array][func][int][splice][this]
duk_dup(ctx, -3); // [socket][agent][table][array][func][int][splice][this][index]
duk_push_int(ctx, 1); // [socket][agent][table][array][func][int][splice][this][index][i]
duk_call_method(ctx, 2); duk_pop(ctx); // [socket][agent][table][array][func][int]
ILibDuktape_GetReferenceCount(ctx, -6);
}
}
return(0);
}
duk_ret_t ILibDuktape_HttpStream_http_OnConnect(duk_context *ctx)
{
duk_ret_t retVal = 0;
@@ -1062,6 +1127,9 @@ duk_ret_t ILibDuktape_HttpStream_http_OnConnect(duk_context *ctx)
duk_array_push(ctx, -2); // [socket][agent][table][array]
duk_pop_2(ctx); // [socket][agent]
ILibDuktape_EventEmitter_AddOn_Infrastructure(ctx, -2, "end", ILibDuktape_Agent_connectionEnded);
duk_get_prop_string(ctx, -1, "keepSocketAlive"); // [socket][agent][keepSocketAlive]
duk_swap_top(ctx, -2); // [socket][keepSocketAlive][this]
duk_dup(ctx, -3); // [socket][keepSocketAlive][this][socket]
@@ -1303,11 +1371,11 @@ duk_ret_t ILibDuktape_HttpStream_http_request(duk_context *ctx)
ILibDuktape_EventEmitter_CreateEventEx(emitter, "timeout");
ILibDuktape_EventEmitter_CreateEventEx(emitter, "upgrade");
ILibDuktape_EventEmitter_CreateEventEx(emitter, "error");
ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "socket", ILibDuktape_HttpStream_http_OnSocketReady);
ILibDuktape_EventEmitter_AddOn_Infrastructure(ctx, -1, "socket", ILibDuktape_HttpStream_http_OnSocketReady);
ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "~", ILibDuktape_ClientRequest_Finalizer);
ILibDuktape_CreateProperty_InstanceMethod(ctx, "abort", ILibDuktape_HttpStream_http_request_abort, 0);
if (nargs > 1 && duk_is_function(ctx, 1))
{
duk_get_prop_string(ctx, -1, "once"); // [clientRequest][once]
@@ -1648,17 +1716,6 @@ duk_ret_t ILibDuktape_HttpStream_http_server_onUpgrade(duk_context *ctx)
duk_call_method(ctx, 4); duk_pop(ctx); // [HS]
return(0);
}
duk_ret_t ILibDuktape_HttpStream_http_server_onConnection_TLSConnect(duk_context *ctx)
{
//duk_get_prop_string(ctx, -1, ILibDuktape_NS2HttpServer); // [NS][HS]
//duk_get_prop_string(ctx, -1, "emit"); // [NS][HS][emit]
//duk_swap_top(ctx, -2); // [NS][emit][this]
//duk_push_string(ctx, "connection"); // [NS][emit][this][connection]
//duk_dup(ctx, 0); // [NS][emit][this][connection][socket]
//if (duk_pcall_method(ctx, 2) != 0) { ILibDuktape_Process_UncaughtExceptionEx(ctx, "http.server.onConnection() => Error dispatching connection event "); }
return(0);
}
duk_ret_t ILibDuktape_HttpStream_http_server_onConnectionTimeout(duk_context *ctx)
{
void *cb = NULL;
@@ -1725,25 +1782,33 @@ duk_ret_t ILibDuktape_HttpStream_http_server_onConnection(duk_context *ctx)
// Pipe: Socket => HttpStream
duk_dup(ctx, 0); // [NS][socket]
duk_push_string(ctx, "server"); duk_put_prop_string(ctx, -2, ILibDuktape_EventEmitter_FinalizerDebugMessage);
duk_get_prop_string(ctx, -1, "pipe"); // [NS][socket][pipe]
duk_dup(ctx, -2); // [NS][socket][pipe][this]
duk_eval_string(ctx, "require('http').createStream(true);"); // [NS][socket][pipe][this][httpStream]
duk_push_string(ctx, "server"); duk_put_prop_string(ctx, -2, ILibDuktape_EventEmitter_FinalizerDebugMessage);
duk_get_prop_string(ctx, -5, ILibDuktape_NS2HttpServer); // [NS][socket][pipe][this][httpStream][httpServer]
duk_dup(ctx, -1); // [NS][socket][pipe][this][httpStream][httpServer][dup]
duk_put_prop_string(ctx, -3, ILibduktape_HttpStream2HttpServer); // [NS][socket][pipe][this][httpStream][httpServer]
ILibDuktape_EventEmitter_ForwardEvent(ctx, -2, "checkContinue", -1, "checkContinue");
ILibDuktape_EventEmitter_ForwardEvent(ctx, -2, "checkExpectation", -1, "checkExpectation");
ILibDuktape_EventEmitter_ForwardEvent(ctx, -2, "clientError", -1, "clientError");
ILibDuktape_EventEmitter_ForwardEventEx(ctx, -2, -1, "checkContinue");
ILibDuktape_EventEmitter_ForwardEventEx(ctx, -2, -1, "checkExpectation");
ILibDuktape_EventEmitter_ForwardEventEx(ctx, -2, -1, "clientError");
//ILibDuktape_EventEmitter_ForwardEvent(ctx, -2, "close", -1, "close");
ILibDuktape_EventEmitter_ForwardEvent(ctx, -2, "connect", -1, "connect");
ILibDuktape_EventEmitter_ForwardEvent(ctx, -2, "request", -1, "request");
ILibDuktape_EventEmitter_AddOnEx(ctx, -2, "parseError", ILibDuktape_HttpStream_http_parseError);
ILibDuktape_EventEmitter_ForwardEventEx(ctx, -2, -1, "connect");
ILibDuktape_EventEmitter_ForwardEventEx(ctx, -2, -1, "request");
//ILibDuktape_EventEmitter_ForwardEvent(ctx, -2, "connect", -1, "connect");
//ILibDuktape_EventEmitter_ForwardEvent(ctx, -2, "request", -1, "request");
ILibDuktape_EventEmitter_AddOn_Infrastructure(ctx, -2, "parseError", ILibDuktape_HttpStream_http_parseError);
if (ILibDuktape_EventEmitter_HasListenersEx(ctx, -1, "upgrade") > 0) { ILibDuktape_EventEmitter_AddOnceEx3(ctx, -2, "upgrade", ILibDuktape_HttpStream_http_server_onUpgrade); }
duk_pop(ctx); // [NS][socket][pipe][this][httpStream]
duk_call_method(ctx, 1); duk_pop_2(ctx); // [NS]
duk_call_method(ctx, 1); duk_pop(ctx); // [NS][socket]
duk_pop(ctx); // [NS]
duk_get_prop_string(ctx, -1, ILibDuktape_NS2HttpServer); // [NS][HS]
duk_get_prop_string(ctx, -1, "emit"); // [NS][HS][emit]
@@ -2715,6 +2780,12 @@ duk_ret_t ILibDuktape_HttpStream_ServerResponse_writeContinue(duk_context *ctx)
duk_call_method(ctx, 2);
return(0);
}
duk_ret_t ILibDuktape_HttpStream_ServerResponse_Finalizer(duk_context *ctx)
{
duk_push_this(ctx);
duk_del_prop_string(ctx, -1, ILibDuktape_SR2HttpStream);
return(0);
}
void ILibDuktape_HttpStream_ServerResponse_PUSH(duk_context *ctx, void* writeStream, ILibHTTPPacket *header, void *httpStream)
{
ILibDuktape_HttpStream_ServerResponse_State *state;
@@ -2760,6 +2831,8 @@ void ILibDuktape_HttpStream_ServerResponse_PUSH(duk_context *ctx, void* writeStr
ILibDuktape_CreateInstanceMethod(ctx, "setHeader", ILibDuktape_HttpStream_ServerResponse_setHeader, 2);
ILibDuktape_CreateInstanceMethod(ctx, "removeHeader", ILibDuktape_HttpStream_ServerResponse_removeHeader, 1);
ILibDuktape_CreateInstanceMethod(ctx, "Digest_writeUnauthorized", ILibDuktape_HttpStream_ServerResponse_Digest_SendUnauthorized, DUK_VARARGS);
ILibDuktape_EventEmitter_AddOn_Infrastructure(ctx, -1, "~", ILibDuktape_HttpStream_ServerResponse_Finalizer);
}
int ILibDuktape_Digest_IsCorrectRealmAndNonce(duk_context *ctx, void *IMSG, char* realm, int realmLen)
@@ -2922,6 +2995,8 @@ duk_ret_t ILibDuktape_HttpStream_IncomingMessage_Digest_ValidatePassword(duk_con
}
duk_ret_t ILibDuktape_HttpStream_IncomingMessage_finalizer(duk_context *ctx)
{
duk_push_this(ctx);
duk_del_prop_string(ctx, -1, ILibDuktape_IMSG2HttpStream);
return(0);
}
void ILibDuktape_HttpStream_AddHeaderDef(duk_context *ctx, struct packetheader *header)
@@ -2958,9 +3033,26 @@ void ILibDuktape_HttpStream_AddHeaderDef(duk_context *ctx, struct packetheader *
duk_put_prop_string(ctx, -2, "statusMessage"); // [message]
}
}
void ILibDuktape_HttpStream_IncomingMessage_PUSH(duk_context *ctx, ILibHTTPPacket *header, void *httpstream)
{
duk_push_object(ctx); // [message]
if (header->Directive != NULL)
{
duk_push_lstring(ctx, header->Directive, header->DirectiveLength); // [message][METHOD]
duk_push_lstring(ctx, header->DirectiveObj, header->DirectiveObjLength);// [message][METHOD][PATH]
duk_push_sprintf(ctx, "%s %s", duk_get_string(ctx, -2), duk_get_string(ctx, -1)); //[METHOD][PATH][STRING]
duk_put_prop_string(ctx, -4, ILibDuktape_EventEmitter_FinalizerDebugMessage);//sage][METHOD][PATH]
duk_pop_2(ctx); // [message]
}
else
{
duk_push_lstring(ctx, header->StatusData, header->StatusDataLength); // [message][STATUS]
duk_push_sprintf(ctx, "%d %s", header->StatusCode, duk_get_string(ctx, -1));// [message][STATUS][STRING]
duk_put_prop_string(ctx, -3, ILibDuktape_EventEmitter_FinalizerDebugMessage);//[message][STATUS]
duk_pop(ctx); // [message]
}
ILibDuktape_CreateFinalizer(ctx, ILibDuktape_HttpStream_IncomingMessage_finalizer);
duk_push_heapptr(ctx, httpstream); // [message][httpStream]
duk_dup(ctx, -1); // [message][httpStream][dup]
@@ -3043,7 +3135,21 @@ duk_ret_t ILibDuktape_HttpStream_OnReceive_bodyStreamFinalized(duk_context *ctx)
return(0);
}
void ILibDuktape_HttpStream_IMSG_EndSink2(duk_context *ctx, void ** args, int argsLen)
{
duk_push_heapptr(ctx, args[0]); // [IMSG]
duk_prepare_method_call(ctx, -1, "removeAllListeners"); // [IMSG][removeAll][this]
duk_pcall_method(ctx, 0); // [IMSG][ret]
duk_pop_2(ctx); // ...
duk_peval_string_noresult(ctx, "require('events').showReferences()");
}
duk_ret_t ILibDuktape_HttpStream_IMSG_EndSink(duk_context *ctx)
{
duk_push_this(ctx);
void *h = duk_get_heapptr(ctx, -1);
ILibDuktape_Immediate(ctx, (void*[]) { h }, 1, ILibDuktape_HttpStream_IMSG_EndSink2);
return(0);
}
void ILibDuktape_HttpStream_OnReceive(ILibWebClient_StateObject WebStateObject, int InterruptFlag, struct packetheader *header, char *bodyBuffer, int *beginPointer, int endPointer, ILibWebClient_ReceiveStatus recvStatus, void *user1, void *user2, int *PAUSE)
{
ILibDuktape_HttpStream_Data *data = (ILibDuktape_HttpStream_Data*)user1;
@@ -3084,8 +3190,8 @@ void ILibDuktape_HttpStream_OnReceive(ILibWebClient_StateObject WebStateObject,
duk_push_string(ctx, "connect"); // [emit][this][connect] // [emit][this][request]
ILibDuktape_HttpStream_IncomingMessage_PUSH(ctx, header, data->DS->ParentObject); // [emit][this][connect][imsg] // [emit][this][request][imsg]
data->bodyStream = ILibDuktape_ReadableStream_InitEx(ctx, ILibDuktape_HttpStream_IncomingMessage_PauseSink, ILibDuktape_HttpStream_IncomingMessage_ResumeSink, ILibDuktape_HttpStream_IncomingMessage_UnshiftBytes, data);
duk_dup(ctx, -3); duk_dup(ctx, -2); // [emit][this][connect][imsg][this][imsg] // [emit][this][request][imsg][httpstream][imsg]
duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2IMSG); duk_pop(ctx); // [emit][this][connect][imsg]
//duk_dup(ctx, -3); duk_dup(ctx, -2); // [emit][this][connect][imsg][this][imsg] // [emit][this][request][imsg][httpstream][imsg]
//duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2IMSG); duk_pop(ctx); // [emit][this][connect][imsg]
duk_get_prop_string(ctx, -3, ILibDuktape_HTTP2PipedReadable); // [emit][this][connect][imsg][socket]
duk_prepare_method_call(ctx, -1, "unpipe"); // [unpipe][this]
duk_pcall_method(ctx, 0); duk_pop(ctx);
@@ -3159,8 +3265,13 @@ void ILibDuktape_HttpStream_OnReceive(ILibWebClient_StateObject WebStateObject,
duk_push_string(ctx, "checkContinue"); // [emit][this][checkContinue]
ILibDuktape_HttpStream_IncomingMessage_PUSH(ctx, header, data->DS->ParentObject); // [emit][this][checkContinue][imsg]
data->bodyStream = ILibDuktape_ReadableStream_InitEx(ctx, ILibDuktape_HttpStream_IncomingMessage_PauseSink, ILibDuktape_HttpStream_IncomingMessage_ResumeSink, ILibDuktape_HttpStream_IncomingMessage_UnshiftBytes, data);
duk_dup(ctx, -3); duk_dup(ctx, -2); // [emit][this][checkContinue][imsg][httpstream][imsg]
duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2IMSG); duk_pop(ctx); // [emit][this][checkContinue][imsg]
//duk_dup(ctx, -3); duk_dup(ctx, -2); // [emit][this][checkContinue][imsg][httpstream][imsg]
//duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2IMSG); duk_pop(ctx); // [emit][this][checkContinue][imsg]
// Setup an infrastructure event, to remove all subscribers when done
duk_events_setup_on(ctx, -1, "end", ILibDuktape_HttpStream_IMSG_EndSink);
duk_push_true(ctx); duk_put_prop_string(ctx, -2, ILibDuktape_EventEmitter_InfrastructureEvent);
duk_pcall_method(ctx, 2); duk_pop(ctx);
ILibDuktape_HttpStream_ServerResponse_PUSH(ctx, data->DS->writableStream->pipedReadable, header, data->DS->ParentObject); // [emit][this][checkContinue][imsg][rsp]
if (duk_pcall_method(ctx, 3) != 0) { ILibDuktape_Process_UncaughtExceptionEx(ctx, "http.httpStream.onReceive->checkContinue(): "); } // [rsp][hadListener]
@@ -3178,9 +3289,15 @@ void ILibDuktape_HttpStream_OnReceive(ILibWebClient_StateObject WebStateObject,
duk_push_string(ctx, "request"); // [emit][this][request]
ILibDuktape_HttpStream_IncomingMessage_PUSH(ctx, header, data->DS->ParentObject); // [emit][this][request][imsg]
data->bodyStream = ILibDuktape_ReadableStream_InitEx(ctx, ILibDuktape_HttpStream_IncomingMessage_PauseSink, ILibDuktape_HttpStream_IncomingMessage_ResumeSink, ILibDuktape_HttpStream_IncomingMessage_UnshiftBytes, data);
duk_dup(ctx, -3); duk_dup(ctx, -2); // [emit][this][request][imsg][httpstream][imsg]
duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2IMSG); duk_pop(ctx); // [emit][this][request][imsg]
//duk_dup(ctx, -3); duk_dup(ctx, -2); // [emit][this][request][imsg][httpstream][imsg]
//duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2IMSG); duk_pop(ctx); // [emit][this][request][imsg]
// Setup an infrastructure event, to remove all subscribers when done
duk_events_setup_on(ctx, -1, "end", ILibDuktape_HttpStream_IMSG_EndSink);
duk_push_true(ctx); duk_put_prop_string(ctx, -2, ILibDuktape_EventEmitter_InfrastructureEvent);
duk_pcall_method(ctx, 2); duk_pop(ctx);
ILibDuktape_HttpStream_ServerResponse_PUSH(ctx, data->DS->writableStream->pipedReadable, header, data->DS->ParentObject); // [emit][this][request][imsg][rsp]
if (duk_pcall_method(ctx, 3) != 0) { ILibDuktape_Process_UncaughtExceptionEx(ctx, "http.httpStream.onReceive->request(): "); }
duk_pop(ctx); // ...
@@ -3192,8 +3309,13 @@ void ILibDuktape_HttpStream_OnReceive(ILibWebClient_StateObject WebStateObject,
duk_push_string(ctx, "request"); // [emit][this][request]
ILibDuktape_HttpStream_IncomingMessage_PUSH(ctx, header, data->DS->ParentObject); // [emit][this][request][imsg]
data->bodyStream = ILibDuktape_ReadableStream_InitEx(ctx, ILibDuktape_HttpStream_IncomingMessage_PauseSink, ILibDuktape_HttpStream_IncomingMessage_ResumeSink, ILibDuktape_HttpStream_IncomingMessage_UnshiftBytes, data);
duk_dup(ctx, -3); duk_dup(ctx, -2); // [emit][this][request][imsg][httpstream][imsg]
duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2IMSG); duk_pop(ctx); // [emit][this][request][imsg]
//duk_dup(ctx, -3); duk_dup(ctx, -2); // [emit][this][request][imsg][httpstream][imsg]
//duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2IMSG); duk_pop(ctx); // [emit][this][request][imsg]
// Setup an infrastructure event, to remove all subscribers when done
duk_events_setup_on(ctx, -1, "end", ILibDuktape_HttpStream_IMSG_EndSink);
duk_push_true(ctx); duk_put_prop_string(ctx, -2, ILibDuktape_EventEmitter_InfrastructureEvent);
duk_pcall_method(ctx, 2); duk_pop(ctx);
ILibDuktape_HttpStream_ServerResponse_PUSH(ctx, data->DS->writableStream->pipedReadable, header, data->DS->ParentObject); // [emit][this][request][imsg][rsp]
@@ -3251,10 +3373,14 @@ void ILibDuktape_HttpStream_OnReceive(ILibWebClient_StateObject WebStateObject,
data->bodyStream = ILibDuktape_ReadableStream_InitEx(ctx, ILibDuktape_HttpStream_IncomingMessage_PauseSink, ILibDuktape_HttpStream_IncomingMessage_ResumeSink, ILibDuktape_HttpStream_IncomingMessage_UnshiftBytes, data);
duk_push_pointer(ctx, data);
duk_put_prop_string(ctx, -2, ILibDuktape_IMSG2Ptr);
duk_dup(ctx, -3); // [emit][this][response][imsg][httpstream]
duk_dup(ctx, -2); // [emit][this][response][imsg][httpstream][imsg]
duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2IMSG); // [emit][this][response][imsg][httpstream]
duk_pop(ctx); // [emit][this][response][imsg]
duk_events_setup_on(ctx, -1, "end", ILibDuktape_HttpStream_IMSG_EndSink);
duk_push_true(ctx); duk_put_prop_string(ctx, -2, ILibDuktape_EventEmitter_InfrastructureEvent);
duk_pcall_method(ctx, 2); duk_pop(ctx);
//duk_dup(ctx, -3); // [emit][this][response][imsg][httpstream]
//duk_dup(ctx, -2); // [emit][this][response][imsg][httpstream][imsg]
//duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2IMSG); // [emit][this][response][imsg][httpstream]
//duk_pop(ctx); // [emit][this][response][imsg]
ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "~", ILibDuktape_HttpStream_OnReceive_bodyStreamFinalized);
@@ -3344,7 +3470,7 @@ void ILibDuktape_HttpStream_OnReceive(ILibWebClient_StateObject WebStateObject,
}
duk_ret_t ILibDuktape_HttpStream_Finalizer(duk_context *ctx)
{
duk_del_prop_string(ctx, 0, ILibDuktape_HTTPStream2IMSG);
//duk_del_prop_string(ctx, 0, ILibDuktape_HTTPStream2IMSG);
duk_get_prop_string(ctx, 0, ILibDuktape_HTTPStream2Data);
ILibDuktape_HttpStream_Data *data = (ILibDuktape_HttpStream_Data*)Duktape_GetBuffer(ctx, -1, NULL);
@@ -3376,6 +3502,16 @@ duk_ret_t ILibDuktape_HttpStream_pipeEvent(duk_context *ctx)
duk_put_prop_string(ctx, -2, ILibDuktape_HTTP2PipedReadable); // [httpStream]
return(0);
}
duk_ret_t ILibDuktape_HttpStream_unpipeEvent(duk_context *ctx)
{
duk_push_this(ctx); // [stream]
duk_del_prop_string(ctx, -1, ILibDuktape_HTTP2PipedReadable);
duk_prepare_method_call(ctx, -1, "removeAllListeners"); // [stream][remove]
duk_call_method(ctx, 0);
duk_eval_string_noresult(ctx, "require('events').showReferences();");
return(0);
}
duk_ret_t ILibduktape_HttpStream_create(duk_context *ctx)
{
int nargs = duk_get_top(ctx);
@@ -3417,7 +3553,8 @@ duk_ret_t ILibduktape_HttpStream_create(duk_context *ctx)
data->chain = Duktape_GetChain(ctx);
ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "pipe", ILibDuktape_HttpStream_pipeEvent);
ILibDuktape_EventEmitter_AddOn_Infrastructure(ctx, -1, "pipe", ILibDuktape_HttpStream_pipeEvent);
ILibDuktape_EventEmitter_AddOn_Infrastructure(ctx, -1, "unpipe", ILibDuktape_HttpStream_unpipeEvent);
ILibDuktape_CreateEventWithGetter(ctx, "connectionCloseSpecified", ILibDuktape_HttpStream_connectionCloseSpecified);
ILibDuktape_CreateFinalizer(ctx, ILibDuktape_HttpStream_Finalizer);
@@ -3638,7 +3775,7 @@ void ILibDuktape_HttpStream_Agent_reuseSocketEx(duk_context *ctx, void ** args,
{
duk_push_this(ctx); // [immediate]
duk_del_prop_string(ctx, -1, "CR");
duk_del_prop_string(ctx, -2, "Socket");
duk_del_prop_string(ctx, -1, "Socket");
duk_pop(ctx); // ...
duk_push_heapptr(ctx, args[1]); // [clientRequest]
@@ -3662,12 +3799,12 @@ duk_ret_t ILibDuktape_HttpStream_Agent_reuseSocket(duk_context *ctx)
{
// Yield to the next loop, before we emit a 'socket' event, because emitting this event before anyone has the clientRequest object is pointless
void *imm = ILibDuktape_Immediate(ctx, (void*[]) { duk_get_heapptr(ctx, 0), duk_get_heapptr(ctx, 1) }, 2, ILibDuktape_HttpStream_Agent_reuseSocketEx);
duk_push_heapptr(ctx, imm); // [immediate]
duk_dup(ctx, 1); // [immediate][ClientRequest]
duk_put_prop_string(ctx, -2, "CR"); // [immediate]
duk_dup(ctx, 0); // [immediate][Socket]
duk_put_prop_string(ctx, -2, "Socket"); // [immediate]
duk_pop(ctx);
//duk_push_heapptr(ctx, imm); // [immediate]
//duk_dup(ctx, 1); // [immediate][ClientRequest]
//duk_put_prop_string(ctx, -2, "CR"); // [immediate]
//duk_dup(ctx, 0); // [immediate][Socket]
//duk_put_prop_string(ctx, -2, "Socket"); // [immediate]
//duk_pop(ctx);
return(0);
}
duk_ret_t ILibDuktape_HttpStream_Agent_createConnection_eventSink(duk_context *ctx)
@@ -4296,6 +4433,16 @@ void ILibDuktape_httpStream_webSocket_EncodedEndSink(ILibDuktape_DuplexStream *s
{
ILibDuktape_WebSocket_State *state = (ILibDuktape_WebSocket_State*)user;
if (duk_ctx_shutting_down(state->ctx)) { return; }
duk_push_heapptr(state->ctx, state->ObjectPtr); // [websocket]
duk_get_prop_string(state->ctx, -1, "decoded"); // [websocket][decoded]
duk_prepare_method_call(state->ctx, -1, "unpipe"); // [websocket][decoded][pmc][this]
duk_pcall_method(state->ctx, 0); duk_pop(state->ctx); // [websocket][decoded]
duk_pop(state->ctx); // [websocket]
ILibDuktape_DeleteReadOnlyProperty(state->ctx, -1, "decoded");
ILibDuktape_DeleteReadOnlyProperty(state->ctx, -1, "decoded");
duk_pop(state->ctx); // ...
if (!state->closed) { ILibDuktape_DuplexStream_WriteEnd(state->decodedStream); }
}
void ILibDuktape_httpStream_webSocket_EncodedPauseSink_Chain(void *chain, void *user)
@@ -4764,7 +4911,6 @@ duk_ret_t ILibDuktape_httpStream_webSocketStream_new(duk_context *ctx)
ILibDuktape_CreateEventWithGetter(ctx, "bytesSent_ratio", ILibDuktape_WebSocket_bytesSent_ratio);
ILibDuktape_CreateEventWithGetter(ctx, "bytesReceived_ratio", ILibDuktape_WebSocket_bytesReceived_ratio);
ILibDuktape_CreateReadonlyProperty(ctx, "decoded"); // [WebSocket]
ILibDuktape_CreateFinalizer(ctx, ILibDuktape_httpStream_webSocketStream_finalizer);
return(1);