From 70dedb6ce8c6e75063d5142dbfef177b5382e655 Mon Sep 17 00:00:00 2001 From: Bryan Roe Date: Tue, 23 Feb 2021 15:04:44 -0800 Subject: [PATCH] Test 1 --- microscript/ILibDuktape_HttpStream.c | 254 +++++++++++++++++++++------ 1 file changed, 200 insertions(+), 54 deletions(-) diff --git a/microscript/ILibDuktape_HttpStream.c b/microscript/ILibDuktape_HttpStream.c index 0850754..d77b62c 100644 --- a/microscript/ILibDuktape_HttpStream.c +++ b/microscript/ILibDuktape_HttpStream.c @@ -7,7 +7,7 @@ You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software +Unless required by applicable law or agreed to in writing, software2 distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and @@ -59,14 +59,13 @@ extern int ILibDeflate(char *buffer, size_t bufferLen, char *compressed, size_t #define ILibDuktape_HTTP2PipedWritable "\xFF_HTTP2PipedWritable" #define ILibDuktape_HTTPStream2Data "\xFF_HTTPStream2Data" #define ILibDuktape_HTTPStream2HTTP "\xFF_HTTPStream2HTTP" -#define ILibDuktape_HTTPStream2IMSG "\xFF_HTTPStream2IMSG" +//#define ILibDuktape_HTTPStream2IMSG "\xFF_HTTPStream2IMSG" #define ILibDuktape_HTTPStream2Socket "\xFF_HTTPStream2Socket" #define ILibDuktape_IMSG2HttpStream "\xFF_IMSG2HttpStream" #define ILibDuktape_IMSG2Ptr "\xFF_IMSG2Ptr" #define ILibDuktape_IMSG2SR "\xFF_IMSG2ServerResponse" #define ILibDuktape_NS2HttpServer "\xFF_Http_NetServer2HttpServer" #define ILibDuktape_Options2ClientRequest "\xFF_Options2ClientRequest" -#define ILibDuktape_PipedReadable "\xFF_PipedReadable" #define ILibDuktape_Socket2AgentStash "\xFF_Socket2AgentStash" #define ILibDuktape_Socket2Agent "\xFF_Socket2Agent" #define ILibDuktape_Socket2AgentKey "\xFF_Socket2AgentKey" @@ -648,16 +647,29 @@ duk_ret_t ILibDuktape_HttpStream_http_endResponseSink(duk_context *ctx) duk_get_prop_string(ctx, -1, "socket"); // [imsg][httpstream][CR][socket] duk_insert(ctx, -4); // [socket][imsg][httpstream][CR] - duk_push_undefined(ctx); // [socket][imsg][httpstream][CR][undefined] - ILibDuktape_CreateReadonlyProperty(ctx, "socket"); // [socket][imsg][httpstream][CR] + + ILibDuktape_DeleteReadOnlyProperty(ctx, -1, "socket"); + ILibDuktape_DeleteReadOnlyProperty(ctx, -2, ILibDuktape_HTTPStream2Socket); + ILibDuktape_DeleteReadOnlyProperty(ctx, -4, ILibDuktape_Socket2HttpStream); + duk_del_prop_string(ctx, -2, ILibDuktape_HTTP2PipedReadable); + + duk_prepare_method_call(ctx, -1, "removeAllListeners"); // [socket][imsg][httpstream][CR][remove][this] + duk_pcall_method(ctx, 0); duk_pop(ctx); // [socket][imsg][httpstream][CR] + + ILibDuktape_DisplayProperties(ctx, -1); + if (Duktape_GetBooleanProperty(ctx, -2, "connectionCloseSpecified", 0) != 0) { // We cant persist this connection, so close the socket. // Agent is already listening for the 'close' event, so it'll cleanup automatically duk_dup(ctx, -4); // [socket][imsg][httpstream][CR][socket] + + ILibDuktape_DisplayProperties(ctx, -1); + duk_get_prop_string(ctx, -1, "end"); // [socket][imsg][httpstream][CR][socket][end] duk_swap_top(ctx, -2); // [socket][imsg][httpstream][CR][end][this] - duk_call_method(ctx, 0); + duk_call_method(ctx, 0); duk_pop(ctx); // [socket][imsg][httpstream][CR] + return(0); } duk_get_prop_string(ctx, -1, ILibDuktape_CR2Agent); // [socket][imsg][httpstream][CR][Agent] @@ -772,6 +784,9 @@ duk_ret_t ILibDuktape_HttpStream_http_OnSocketClosed(duk_context *ctx) duk_push_this(ctx); // [socket] if (duk_has_prop_string(ctx, -1, ILibDuktape_Socket2HttpStream)) { + duk_get_prop_string(ctx, -1, ILibDuktape_Socket2HttpStream); + printf(" REMOVING reference to HttpStream: %s\n", Duktape_GetStringPropertyValue(ctx, -1, ILibDuktape_EventEmitter_FinalizerDebugMessage, "")); + duk_pop(ctx); duk_del_prop_string(ctx, -1, ILibDuktape_Socket2HttpStream); } duk_pop(ctx); // ... @@ -783,6 +798,7 @@ duk_ret_t ILibDuktape_HttpStream_http_OnSocketReady(duk_context *ctx) void *httpStream; duk_dup(ctx, 0); // [socket] + duk_push_string(ctx, "client"); duk_put_prop_string(ctx, -2, ILibDuktape_EventEmitter_FinalizerDebugMessage); duk_push_c_function(ctx, ILibDuktape_HttpStream_http_SocketDiedPrematurely, DUK_VARARGS); duk_put_prop_string(ctx, -2, ILibDuktape_Socket2DiedListener); // [socket] duk_push_this(ctx); // [socket][clientRequest] @@ -850,8 +866,9 @@ duk_ret_t ILibDuktape_HttpStream_http_OnSocketReady(duk_context *ctx) ILibDuktape_EventEmitter_ForwardEvent(ctx, -1, "response", -2, "response"); ILibDuktape_EventEmitter_ForwardEvent(ctx, -1, "continue", -2, "continue"); - ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "upgrade", ILibDuktape_HttpStream_http_onUpgrade); - ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "response", ILibDuktape_HttpStream_http_responseSink); + + ILibDuktape_EventEmitter_AddOn_Infrastructure(ctx, -1, "upgrade", ILibDuktape_HttpStream_http_onUpgrade); + ILibDuktape_EventEmitter_AddOn_Infrastructure(ctx, -1, "response", ILibDuktape_HttpStream_http_responseSink); duk_get_prop_string(ctx, -1, ILibDuktape_HTTP2PipedWritable); // [socket][clientRequest][HTTPStream][destination] @@ -880,6 +897,7 @@ duk_ret_t ILibDuktape_HttpStream_http_OnSocketReady(duk_context *ctx) } httpStream = duk_get_heapptr(ctx, -1); // [socket][clientRequest][httpStream] + duk_push_string(ctx, "client"); duk_put_prop_string(ctx, -2, ILibDuktape_EventEmitter_FinalizerDebugMessage); duk_dup(ctx, -3); // [socket][clientRequest][httpStream][socket] duk_dup(ctx, -2); // [socket][clientRequest][httpStream][socket][httpStream] if (strcmp(Duktape_GetStringPropertyValue(ctx, -2, ILibDuktape_OBJID, "net.socket"), "tls.socket") == 0) { ILibDuktape_WriteID(ctx, "https.httpStream"); } @@ -887,10 +905,10 @@ duk_ret_t ILibDuktape_HttpStream_http_OnSocketReady(duk_context *ctx) duk_pop(ctx); // [socket][clientRequest][httpStream] duk_dup(ctx, -2); // [socket][clientRequest][httpStream][clientRequest] duk_put_prop_string(ctx, -2, ILibDuktape_HTTP2CR); // [socket][clientRequest][httpStream] - ILibDuktape_EventEmitter_ForwardEvent(ctx, -1, "response", -2, "response"); + ILibDuktape_EventEmitter_ForwardEventEx(ctx, -1, -2, "response"); ILibDuktape_EventEmitter_ForwardEvent(ctx, -1, "continue", -2, "continue"); - ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "upgrade", ILibDuktape_HttpStream_http_onUpgrade); - ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "response", ILibDuktape_HttpStream_http_responseSink); + ILibDuktape_EventEmitter_AddOn_Infrastructure(ctx, -1, "upgrade", ILibDuktape_HttpStream_http_onUpgrade); + ILibDuktape_EventEmitter_AddOn_Infrastructure(ctx, -1, "response", ILibDuktape_HttpStream_http_responseSink); ILibDuktape_EventEmitter_AddOnceEx3(ctx, -1, "write", ILibDuktape_HttpStream_http_SocketResponseReceived); ILibDuktape_EventEmitter_AddOnceEx3(ctx, -3, "close", ILibDuktape_HttpStream_http_OnSocketClosed); // We need to detach HttpStream when socket closes @@ -933,6 +951,7 @@ duk_ret_t ILibDuktape_HttpStream_http_OnSocketReady(duk_context *ctx) duk_ret_t ILibDuktape_HttpStream_http_OnConnectError(duk_context *ctx) { duk_push_this(ctx); // [socket] + if (duk_has_prop_string(ctx, -1, ILibDuktape_Socket2CR)) { // Socket was created via 'createConnection' specified by the application @@ -1028,7 +1047,53 @@ duk_ret_t ILibDuktape_HttpStream_http_OnConnectError(duk_context *ctx) // } // return(0); //} +duk_ret_t ILibDuktape_Agent_findConnection(duk_context *ctx) +{ + duk_push_current_function(ctx); // [func] + duk_get_prop_string(ctx, -1, "socket"); // [func][socket] + duk_dup(ctx, 0); // [func][socket][socket] + if (duk_equals(ctx, -2, -1)) + { + duk_push_true(ctx); + } + else + { + duk_push_false(ctx); + } + + return(1); +} +duk_ret_t ILibDuktape_Agent_connectionEnded(duk_context *ctx) +{ + duk_push_this(ctx); // [socket] + duk_prepare_method_call(ctx, -1, "removeAllListeners"); // [socket][remove][this] + duk_call_method(ctx, 0); duk_pop(ctx); // [socket] + ILibDuktape_GetReferenceCount(ctx, -1); + char *key = (char*)Duktape_GetStringPropertyValue(ctx, -1, ILibDuktape_Socket2AgentKey, NULL); + duk_get_prop_string(ctx, -1, ILibDuktape_Socket2Agent); // [socket][agent] + duk_get_prop_string(ctx, -1, "sockets"); // [socket][agent][table] + duk_get_prop_string(ctx, -1, key); // [socket][agent][table][array] + if (duk_is_array(ctx, -1)) + { + duk_push_c_function(ctx, ILibDuktape_Agent_findConnection, DUK_VARARGS); //array][func] + duk_prepare_method_call(ctx, -2, "findIndex"); // [socket][agent][table][array][func][findIndex][this] + duk_dup(ctx, -3); // [socket][agent][table][array][func][findIndex][this][func] + duk_push_this(ctx); // [socket][agent][table][array][func][findIndex][this][func][socket] + duk_put_prop_string(ctx, -2, "socket"); // [socket][agent][table][array][func][findIndex][this][func] + duk_call_method(ctx, 1); // [socket][agent][table][array][func][int] + duk_del_prop_string(ctx, -2, "socket"); + if (duk_get_int(ctx, -1) >= 0) + { + duk_prepare_method_call(ctx, -3, "splice"); // [socket][agent][table][array][func][int][splice][this] + duk_dup(ctx, -3); // [socket][agent][table][array][func][int][splice][this][index] + duk_push_int(ctx, 1); // [socket][agent][table][array][func][int][splice][this][index][i] + duk_call_method(ctx, 2); duk_pop(ctx); // [socket][agent][table][array][func][int] + ILibDuktape_GetReferenceCount(ctx, -6); + } + } + return(0); +} duk_ret_t ILibDuktape_HttpStream_http_OnConnect(duk_context *ctx) { duk_ret_t retVal = 0; @@ -1062,6 +1127,9 @@ duk_ret_t ILibDuktape_HttpStream_http_OnConnect(duk_context *ctx) duk_array_push(ctx, -2); // [socket][agent][table][array] duk_pop_2(ctx); // [socket][agent] + ILibDuktape_EventEmitter_AddOn_Infrastructure(ctx, -2, "end", ILibDuktape_Agent_connectionEnded); + + duk_get_prop_string(ctx, -1, "keepSocketAlive"); // [socket][agent][keepSocketAlive] duk_swap_top(ctx, -2); // [socket][keepSocketAlive][this] duk_dup(ctx, -3); // [socket][keepSocketAlive][this][socket] @@ -1303,11 +1371,11 @@ duk_ret_t ILibDuktape_HttpStream_http_request(duk_context *ctx) ILibDuktape_EventEmitter_CreateEventEx(emitter, "timeout"); ILibDuktape_EventEmitter_CreateEventEx(emitter, "upgrade"); ILibDuktape_EventEmitter_CreateEventEx(emitter, "error"); - ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "socket", ILibDuktape_HttpStream_http_OnSocketReady); + + ILibDuktape_EventEmitter_AddOn_Infrastructure(ctx, -1, "socket", ILibDuktape_HttpStream_http_OnSocketReady); ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "~", ILibDuktape_ClientRequest_Finalizer); ILibDuktape_CreateProperty_InstanceMethod(ctx, "abort", ILibDuktape_HttpStream_http_request_abort, 0); - if (nargs > 1 && duk_is_function(ctx, 1)) { duk_get_prop_string(ctx, -1, "once"); // [clientRequest][once] @@ -1648,17 +1716,6 @@ duk_ret_t ILibDuktape_HttpStream_http_server_onUpgrade(duk_context *ctx) duk_call_method(ctx, 4); duk_pop(ctx); // [HS] return(0); } -duk_ret_t ILibDuktape_HttpStream_http_server_onConnection_TLSConnect(duk_context *ctx) -{ - //duk_get_prop_string(ctx, -1, ILibDuktape_NS2HttpServer); // [NS][HS] - //duk_get_prop_string(ctx, -1, "emit"); // [NS][HS][emit] - //duk_swap_top(ctx, -2); // [NS][emit][this] - //duk_push_string(ctx, "connection"); // [NS][emit][this][connection] - //duk_dup(ctx, 0); // [NS][emit][this][connection][socket] - //if (duk_pcall_method(ctx, 2) != 0) { ILibDuktape_Process_UncaughtExceptionEx(ctx, "http.server.onConnection() => Error dispatching connection event "); } - - return(0); -} duk_ret_t ILibDuktape_HttpStream_http_server_onConnectionTimeout(duk_context *ctx) { void *cb = NULL; @@ -1725,25 +1782,33 @@ duk_ret_t ILibDuktape_HttpStream_http_server_onConnection(duk_context *ctx) // Pipe: Socket => HttpStream duk_dup(ctx, 0); // [NS][socket] + duk_push_string(ctx, "server"); duk_put_prop_string(ctx, -2, ILibDuktape_EventEmitter_FinalizerDebugMessage); duk_get_prop_string(ctx, -1, "pipe"); // [NS][socket][pipe] duk_dup(ctx, -2); // [NS][socket][pipe][this] duk_eval_string(ctx, "require('http').createStream(true);"); // [NS][socket][pipe][this][httpStream] + duk_push_string(ctx, "server"); duk_put_prop_string(ctx, -2, ILibDuktape_EventEmitter_FinalizerDebugMessage); duk_get_prop_string(ctx, -5, ILibDuktape_NS2HttpServer); // [NS][socket][pipe][this][httpStream][httpServer] duk_dup(ctx, -1); // [NS][socket][pipe][this][httpStream][httpServer][dup] duk_put_prop_string(ctx, -3, ILibduktape_HttpStream2HttpServer); // [NS][socket][pipe][this][httpStream][httpServer] - ILibDuktape_EventEmitter_ForwardEvent(ctx, -2, "checkContinue", -1, "checkContinue"); - ILibDuktape_EventEmitter_ForwardEvent(ctx, -2, "checkExpectation", -1, "checkExpectation"); - ILibDuktape_EventEmitter_ForwardEvent(ctx, -2, "clientError", -1, "clientError"); + ILibDuktape_EventEmitter_ForwardEventEx(ctx, -2, -1, "checkContinue"); + ILibDuktape_EventEmitter_ForwardEventEx(ctx, -2, -1, "checkExpectation"); + ILibDuktape_EventEmitter_ForwardEventEx(ctx, -2, -1, "clientError"); //ILibDuktape_EventEmitter_ForwardEvent(ctx, -2, "close", -1, "close"); - ILibDuktape_EventEmitter_ForwardEvent(ctx, -2, "connect", -1, "connect"); - ILibDuktape_EventEmitter_ForwardEvent(ctx, -2, "request", -1, "request"); - ILibDuktape_EventEmitter_AddOnEx(ctx, -2, "parseError", ILibDuktape_HttpStream_http_parseError); + ILibDuktape_EventEmitter_ForwardEventEx(ctx, -2, -1, "connect"); + ILibDuktape_EventEmitter_ForwardEventEx(ctx, -2, -1, "request"); + + //ILibDuktape_EventEmitter_ForwardEvent(ctx, -2, "connect", -1, "connect"); + //ILibDuktape_EventEmitter_ForwardEvent(ctx, -2, "request", -1, "request"); + ILibDuktape_EventEmitter_AddOn_Infrastructure(ctx, -2, "parseError", ILibDuktape_HttpStream_http_parseError); + if (ILibDuktape_EventEmitter_HasListenersEx(ctx, -1, "upgrade") > 0) { ILibDuktape_EventEmitter_AddOnceEx3(ctx, -2, "upgrade", ILibDuktape_HttpStream_http_server_onUpgrade); } duk_pop(ctx); // [NS][socket][pipe][this][httpStream] - duk_call_method(ctx, 1); duk_pop_2(ctx); // [NS] + duk_call_method(ctx, 1); duk_pop(ctx); // [NS][socket] + + duk_pop(ctx); // [NS] duk_get_prop_string(ctx, -1, ILibDuktape_NS2HttpServer); // [NS][HS] duk_get_prop_string(ctx, -1, "emit"); // [NS][HS][emit] @@ -2715,6 +2780,12 @@ duk_ret_t ILibDuktape_HttpStream_ServerResponse_writeContinue(duk_context *ctx) duk_call_method(ctx, 2); return(0); } +duk_ret_t ILibDuktape_HttpStream_ServerResponse_Finalizer(duk_context *ctx) +{ + duk_push_this(ctx); + duk_del_prop_string(ctx, -1, ILibDuktape_SR2HttpStream); + return(0); +} void ILibDuktape_HttpStream_ServerResponse_PUSH(duk_context *ctx, void* writeStream, ILibHTTPPacket *header, void *httpStream) { ILibDuktape_HttpStream_ServerResponse_State *state; @@ -2760,6 +2831,8 @@ void ILibDuktape_HttpStream_ServerResponse_PUSH(duk_context *ctx, void* writeStr ILibDuktape_CreateInstanceMethod(ctx, "setHeader", ILibDuktape_HttpStream_ServerResponse_setHeader, 2); ILibDuktape_CreateInstanceMethod(ctx, "removeHeader", ILibDuktape_HttpStream_ServerResponse_removeHeader, 1); ILibDuktape_CreateInstanceMethod(ctx, "Digest_writeUnauthorized", ILibDuktape_HttpStream_ServerResponse_Digest_SendUnauthorized, DUK_VARARGS); + + ILibDuktape_EventEmitter_AddOn_Infrastructure(ctx, -1, "~", ILibDuktape_HttpStream_ServerResponse_Finalizer); } int ILibDuktape_Digest_IsCorrectRealmAndNonce(duk_context *ctx, void *IMSG, char* realm, int realmLen) @@ -2922,6 +2995,8 @@ duk_ret_t ILibDuktape_HttpStream_IncomingMessage_Digest_ValidatePassword(duk_con } duk_ret_t ILibDuktape_HttpStream_IncomingMessage_finalizer(duk_context *ctx) { + duk_push_this(ctx); + duk_del_prop_string(ctx, -1, ILibDuktape_IMSG2HttpStream); return(0); } void ILibDuktape_HttpStream_AddHeaderDef(duk_context *ctx, struct packetheader *header) @@ -2958,9 +3033,26 @@ void ILibDuktape_HttpStream_AddHeaderDef(duk_context *ctx, struct packetheader * duk_put_prop_string(ctx, -2, "statusMessage"); // [message] } } + void ILibDuktape_HttpStream_IncomingMessage_PUSH(duk_context *ctx, ILibHTTPPacket *header, void *httpstream) { duk_push_object(ctx); // [message] + if (header->Directive != NULL) + { + duk_push_lstring(ctx, header->Directive, header->DirectiveLength); // [message][METHOD] + duk_push_lstring(ctx, header->DirectiveObj, header->DirectiveObjLength);// [message][METHOD][PATH] + duk_push_sprintf(ctx, "%s %s", duk_get_string(ctx, -2), duk_get_string(ctx, -1)); //[METHOD][PATH][STRING] + duk_put_prop_string(ctx, -4, ILibDuktape_EventEmitter_FinalizerDebugMessage);//sage][METHOD][PATH] + duk_pop_2(ctx); // [message] + } + else + { + duk_push_lstring(ctx, header->StatusData, header->StatusDataLength); // [message][STATUS] + duk_push_sprintf(ctx, "%d %s", header->StatusCode, duk_get_string(ctx, -1));// [message][STATUS][STRING] + duk_put_prop_string(ctx, -3, ILibDuktape_EventEmitter_FinalizerDebugMessage);//[message][STATUS] + duk_pop(ctx); // [message] + } + ILibDuktape_CreateFinalizer(ctx, ILibDuktape_HttpStream_IncomingMessage_finalizer); duk_push_heapptr(ctx, httpstream); // [message][httpStream] duk_dup(ctx, -1); // [message][httpStream][dup] @@ -3043,7 +3135,21 @@ duk_ret_t ILibDuktape_HttpStream_OnReceive_bodyStreamFinalized(duk_context *ctx) return(0); } - +void ILibDuktape_HttpStream_IMSG_EndSink2(duk_context *ctx, void ** args, int argsLen) +{ + duk_push_heapptr(ctx, args[0]); // [IMSG] + duk_prepare_method_call(ctx, -1, "removeAllListeners"); // [IMSG][removeAll][this] + duk_pcall_method(ctx, 0); // [IMSG][ret] + duk_pop_2(ctx); // ... + duk_peval_string_noresult(ctx, "require('events').showReferences()"); +} +duk_ret_t ILibDuktape_HttpStream_IMSG_EndSink(duk_context *ctx) +{ + duk_push_this(ctx); + void *h = duk_get_heapptr(ctx, -1); + ILibDuktape_Immediate(ctx, (void*[]) { h }, 1, ILibDuktape_HttpStream_IMSG_EndSink2); + return(0); +} void ILibDuktape_HttpStream_OnReceive(ILibWebClient_StateObject WebStateObject, int InterruptFlag, struct packetheader *header, char *bodyBuffer, int *beginPointer, int endPointer, ILibWebClient_ReceiveStatus recvStatus, void *user1, void *user2, int *PAUSE) { ILibDuktape_HttpStream_Data *data = (ILibDuktape_HttpStream_Data*)user1; @@ -3084,8 +3190,8 @@ void ILibDuktape_HttpStream_OnReceive(ILibWebClient_StateObject WebStateObject, duk_push_string(ctx, "connect"); // [emit][this][connect] // [emit][this][request] ILibDuktape_HttpStream_IncomingMessage_PUSH(ctx, header, data->DS->ParentObject); // [emit][this][connect][imsg] // [emit][this][request][imsg] data->bodyStream = ILibDuktape_ReadableStream_InitEx(ctx, ILibDuktape_HttpStream_IncomingMessage_PauseSink, ILibDuktape_HttpStream_IncomingMessage_ResumeSink, ILibDuktape_HttpStream_IncomingMessage_UnshiftBytes, data); - duk_dup(ctx, -3); duk_dup(ctx, -2); // [emit][this][connect][imsg][this][imsg] // [emit][this][request][imsg][httpstream][imsg] - duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2IMSG); duk_pop(ctx); // [emit][this][connect][imsg] + //duk_dup(ctx, -3); duk_dup(ctx, -2); // [emit][this][connect][imsg][this][imsg] // [emit][this][request][imsg][httpstream][imsg] + //duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2IMSG); duk_pop(ctx); // [emit][this][connect][imsg] duk_get_prop_string(ctx, -3, ILibDuktape_HTTP2PipedReadable); // [emit][this][connect][imsg][socket] duk_prepare_method_call(ctx, -1, "unpipe"); // [unpipe][this] duk_pcall_method(ctx, 0); duk_pop(ctx); @@ -3159,8 +3265,13 @@ void ILibDuktape_HttpStream_OnReceive(ILibWebClient_StateObject WebStateObject, duk_push_string(ctx, "checkContinue"); // [emit][this][checkContinue] ILibDuktape_HttpStream_IncomingMessage_PUSH(ctx, header, data->DS->ParentObject); // [emit][this][checkContinue][imsg] data->bodyStream = ILibDuktape_ReadableStream_InitEx(ctx, ILibDuktape_HttpStream_IncomingMessage_PauseSink, ILibDuktape_HttpStream_IncomingMessage_ResumeSink, ILibDuktape_HttpStream_IncomingMessage_UnshiftBytes, data); - duk_dup(ctx, -3); duk_dup(ctx, -2); // [emit][this][checkContinue][imsg][httpstream][imsg] - duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2IMSG); duk_pop(ctx); // [emit][this][checkContinue][imsg] + //duk_dup(ctx, -3); duk_dup(ctx, -2); // [emit][this][checkContinue][imsg][httpstream][imsg] + //duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2IMSG); duk_pop(ctx); // [emit][this][checkContinue][imsg] + + // Setup an infrastructure event, to remove all subscribers when done + duk_events_setup_on(ctx, -1, "end", ILibDuktape_HttpStream_IMSG_EndSink); + duk_push_true(ctx); duk_put_prop_string(ctx, -2, ILibDuktape_EventEmitter_InfrastructureEvent); + duk_pcall_method(ctx, 2); duk_pop(ctx); ILibDuktape_HttpStream_ServerResponse_PUSH(ctx, data->DS->writableStream->pipedReadable, header, data->DS->ParentObject); // [emit][this][checkContinue][imsg][rsp] if (duk_pcall_method(ctx, 3) != 0) { ILibDuktape_Process_UncaughtExceptionEx(ctx, "http.httpStream.onReceive->checkContinue(): "); } // [rsp][hadListener] @@ -3178,9 +3289,15 @@ void ILibDuktape_HttpStream_OnReceive(ILibWebClient_StateObject WebStateObject, duk_push_string(ctx, "request"); // [emit][this][request] ILibDuktape_HttpStream_IncomingMessage_PUSH(ctx, header, data->DS->ParentObject); // [emit][this][request][imsg] data->bodyStream = ILibDuktape_ReadableStream_InitEx(ctx, ILibDuktape_HttpStream_IncomingMessage_PauseSink, ILibDuktape_HttpStream_IncomingMessage_ResumeSink, ILibDuktape_HttpStream_IncomingMessage_UnshiftBytes, data); - duk_dup(ctx, -3); duk_dup(ctx, -2); // [emit][this][request][imsg][httpstream][imsg] - duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2IMSG); duk_pop(ctx); // [emit][this][request][imsg] + //duk_dup(ctx, -3); duk_dup(ctx, -2); // [emit][this][request][imsg][httpstream][imsg] + //duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2IMSG); duk_pop(ctx); // [emit][this][request][imsg] + // Setup an infrastructure event, to remove all subscribers when done + duk_events_setup_on(ctx, -1, "end", ILibDuktape_HttpStream_IMSG_EndSink); + duk_push_true(ctx); duk_put_prop_string(ctx, -2, ILibDuktape_EventEmitter_InfrastructureEvent); + duk_pcall_method(ctx, 2); duk_pop(ctx); + + ILibDuktape_HttpStream_ServerResponse_PUSH(ctx, data->DS->writableStream->pipedReadable, header, data->DS->ParentObject); // [emit][this][request][imsg][rsp] if (duk_pcall_method(ctx, 3) != 0) { ILibDuktape_Process_UncaughtExceptionEx(ctx, "http.httpStream.onReceive->request(): "); } duk_pop(ctx); // ... @@ -3192,8 +3309,13 @@ void ILibDuktape_HttpStream_OnReceive(ILibWebClient_StateObject WebStateObject, duk_push_string(ctx, "request"); // [emit][this][request] ILibDuktape_HttpStream_IncomingMessage_PUSH(ctx, header, data->DS->ParentObject); // [emit][this][request][imsg] data->bodyStream = ILibDuktape_ReadableStream_InitEx(ctx, ILibDuktape_HttpStream_IncomingMessage_PauseSink, ILibDuktape_HttpStream_IncomingMessage_ResumeSink, ILibDuktape_HttpStream_IncomingMessage_UnshiftBytes, data); - duk_dup(ctx, -3); duk_dup(ctx, -2); // [emit][this][request][imsg][httpstream][imsg] - duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2IMSG); duk_pop(ctx); // [emit][this][request][imsg] + //duk_dup(ctx, -3); duk_dup(ctx, -2); // [emit][this][request][imsg][httpstream][imsg] + //duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2IMSG); duk_pop(ctx); // [emit][this][request][imsg] + + // Setup an infrastructure event, to remove all subscribers when done + duk_events_setup_on(ctx, -1, "end", ILibDuktape_HttpStream_IMSG_EndSink); + duk_push_true(ctx); duk_put_prop_string(ctx, -2, ILibDuktape_EventEmitter_InfrastructureEvent); + duk_pcall_method(ctx, 2); duk_pop(ctx); ILibDuktape_HttpStream_ServerResponse_PUSH(ctx, data->DS->writableStream->pipedReadable, header, data->DS->ParentObject); // [emit][this][request][imsg][rsp] @@ -3251,10 +3373,14 @@ void ILibDuktape_HttpStream_OnReceive(ILibWebClient_StateObject WebStateObject, data->bodyStream = ILibDuktape_ReadableStream_InitEx(ctx, ILibDuktape_HttpStream_IncomingMessage_PauseSink, ILibDuktape_HttpStream_IncomingMessage_ResumeSink, ILibDuktape_HttpStream_IncomingMessage_UnshiftBytes, data); duk_push_pointer(ctx, data); duk_put_prop_string(ctx, -2, ILibDuktape_IMSG2Ptr); - duk_dup(ctx, -3); // [emit][this][response][imsg][httpstream] - duk_dup(ctx, -2); // [emit][this][response][imsg][httpstream][imsg] - duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2IMSG); // [emit][this][response][imsg][httpstream] - duk_pop(ctx); // [emit][this][response][imsg] + duk_events_setup_on(ctx, -1, "end", ILibDuktape_HttpStream_IMSG_EndSink); + duk_push_true(ctx); duk_put_prop_string(ctx, -2, ILibDuktape_EventEmitter_InfrastructureEvent); + duk_pcall_method(ctx, 2); duk_pop(ctx); + + //duk_dup(ctx, -3); // [emit][this][response][imsg][httpstream] + //duk_dup(ctx, -2); // [emit][this][response][imsg][httpstream][imsg] + //duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2IMSG); // [emit][this][response][imsg][httpstream] + //duk_pop(ctx); // [emit][this][response][imsg] ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "~", ILibDuktape_HttpStream_OnReceive_bodyStreamFinalized); @@ -3344,7 +3470,7 @@ void ILibDuktape_HttpStream_OnReceive(ILibWebClient_StateObject WebStateObject, } duk_ret_t ILibDuktape_HttpStream_Finalizer(duk_context *ctx) { - duk_del_prop_string(ctx, 0, ILibDuktape_HTTPStream2IMSG); + //duk_del_prop_string(ctx, 0, ILibDuktape_HTTPStream2IMSG); duk_get_prop_string(ctx, 0, ILibDuktape_HTTPStream2Data); ILibDuktape_HttpStream_Data *data = (ILibDuktape_HttpStream_Data*)Duktape_GetBuffer(ctx, -1, NULL); @@ -3376,6 +3502,16 @@ duk_ret_t ILibDuktape_HttpStream_pipeEvent(duk_context *ctx) duk_put_prop_string(ctx, -2, ILibDuktape_HTTP2PipedReadable); // [httpStream] return(0); } +duk_ret_t ILibDuktape_HttpStream_unpipeEvent(duk_context *ctx) +{ + duk_push_this(ctx); // [stream] + duk_del_prop_string(ctx, -1, ILibDuktape_HTTP2PipedReadable); + duk_prepare_method_call(ctx, -1, "removeAllListeners"); // [stream][remove] + duk_call_method(ctx, 0); + + duk_eval_string_noresult(ctx, "require('events').showReferences();"); + return(0); +} duk_ret_t ILibduktape_HttpStream_create(duk_context *ctx) { int nargs = duk_get_top(ctx); @@ -3417,7 +3553,8 @@ duk_ret_t ILibduktape_HttpStream_create(duk_context *ctx) data->chain = Duktape_GetChain(ctx); - ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "pipe", ILibDuktape_HttpStream_pipeEvent); + ILibDuktape_EventEmitter_AddOn_Infrastructure(ctx, -1, "pipe", ILibDuktape_HttpStream_pipeEvent); + ILibDuktape_EventEmitter_AddOn_Infrastructure(ctx, -1, "unpipe", ILibDuktape_HttpStream_unpipeEvent); ILibDuktape_CreateEventWithGetter(ctx, "connectionCloseSpecified", ILibDuktape_HttpStream_connectionCloseSpecified); ILibDuktape_CreateFinalizer(ctx, ILibDuktape_HttpStream_Finalizer); @@ -3638,7 +3775,7 @@ void ILibDuktape_HttpStream_Agent_reuseSocketEx(duk_context *ctx, void ** args, { duk_push_this(ctx); // [immediate] duk_del_prop_string(ctx, -1, "CR"); - duk_del_prop_string(ctx, -2, "Socket"); + duk_del_prop_string(ctx, -1, "Socket"); duk_pop(ctx); // ... duk_push_heapptr(ctx, args[1]); // [clientRequest] @@ -3662,12 +3799,12 @@ duk_ret_t ILibDuktape_HttpStream_Agent_reuseSocket(duk_context *ctx) { // Yield to the next loop, before we emit a 'socket' event, because emitting this event before anyone has the clientRequest object is pointless void *imm = ILibDuktape_Immediate(ctx, (void*[]) { duk_get_heapptr(ctx, 0), duk_get_heapptr(ctx, 1) }, 2, ILibDuktape_HttpStream_Agent_reuseSocketEx); - duk_push_heapptr(ctx, imm); // [immediate] - duk_dup(ctx, 1); // [immediate][ClientRequest] - duk_put_prop_string(ctx, -2, "CR"); // [immediate] - duk_dup(ctx, 0); // [immediate][Socket] - duk_put_prop_string(ctx, -2, "Socket"); // [immediate] - duk_pop(ctx); + //duk_push_heapptr(ctx, imm); // [immediate] + //duk_dup(ctx, 1); // [immediate][ClientRequest] + //duk_put_prop_string(ctx, -2, "CR"); // [immediate] + //duk_dup(ctx, 0); // [immediate][Socket] + //duk_put_prop_string(ctx, -2, "Socket"); // [immediate] + //duk_pop(ctx); return(0); } duk_ret_t ILibDuktape_HttpStream_Agent_createConnection_eventSink(duk_context *ctx) @@ -4296,6 +4433,16 @@ void ILibDuktape_httpStream_webSocket_EncodedEndSink(ILibDuktape_DuplexStream *s { ILibDuktape_WebSocket_State *state = (ILibDuktape_WebSocket_State*)user; if (duk_ctx_shutting_down(state->ctx)) { return; } + + duk_push_heapptr(state->ctx, state->ObjectPtr); // [websocket] + duk_get_prop_string(state->ctx, -1, "decoded"); // [websocket][decoded] + duk_prepare_method_call(state->ctx, -1, "unpipe"); // [websocket][decoded][pmc][this] + duk_pcall_method(state->ctx, 0); duk_pop(state->ctx); // [websocket][decoded] + duk_pop(state->ctx); // [websocket] + ILibDuktape_DeleteReadOnlyProperty(state->ctx, -1, "decoded"); + ILibDuktape_DeleteReadOnlyProperty(state->ctx, -1, "decoded"); + duk_pop(state->ctx); // ... + if (!state->closed) { ILibDuktape_DuplexStream_WriteEnd(state->decodedStream); } } void ILibDuktape_httpStream_webSocket_EncodedPauseSink_Chain(void *chain, void *user) @@ -4764,7 +4911,6 @@ duk_ret_t ILibDuktape_httpStream_webSocketStream_new(duk_context *ctx) ILibDuktape_CreateEventWithGetter(ctx, "bytesSent_ratio", ILibDuktape_WebSocket_bytesSent_ratio); ILibDuktape_CreateEventWithGetter(ctx, "bytesReceived_ratio", ILibDuktape_WebSocket_bytesReceived_ratio); - ILibDuktape_CreateReadonlyProperty(ctx, "decoded"); // [WebSocket] ILibDuktape_CreateFinalizer(ctx, ILibDuktape_httpStream_webSocketStream_finalizer); return(1);