1
0
mirror of https://github.com/Ylianst/MeshAgent synced 2025-12-31 15:43:57 +00:00

Many improvements.

This commit is contained in:
Ylian Saint-Hilaire
2018-02-11 21:11:58 -08:00
parent 508646044e
commit 4b5c77b4fd
127 changed files with 51725 additions and 35030 deletions

View File

@@ -1,3 +1,19 @@
/*
Copyright 2006 - 2018 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifdef WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
@@ -29,7 +45,7 @@ extern void ILibWebClient_ResetWCDO(struct ILibWebClientDataObject *wcdo);
#define ILibDuktape_CR2Agent "\xFF_CR2Agent"
#define ILibDuktape_CR2HTTPStream "\xFF_CR2HTTPStream"
#define ILibDuktape_CR2TLSStream "\xFF_CR2TLSStream"
#define ILibDuktape_CR2WS "\xFF_ClientRequest2WS"
#define ILibDuktape_CR2Transform "\xFF_CR2Transform"
#define ILibDuktape_FUNC "\xFF_FUNC"
#define IILibDuktape_HTTP_HoldingQueue "\xFF_HoldingQueue"
#define ILibDuktape_Http_Server_FixedBuffer "\xFF_Http_Server_FixedBuffer"
@@ -41,7 +57,9 @@ extern void ILibWebClient_ResetWCDO(struct ILibWebClientDataObject *wcdo);
#define ILibDuktape_HTTP2PipedWritable "\xFF_HTTP2PipedWritable"
#define ILibDuktape_HTTPStream2Data "\xFF_HTTPStream2Data"
#define ILibDuktape_HTTPStream2HTTP "\xFF_HTTPStream2HTTP"
#define ILibDuktape_HTTPStream2Socket "\xFF_HTTPStream2Socket"
#define ILibDuktape_IMSG2HttpStream "\xFF_IMSG2HttpStream"
#define ILibDuktape_IMSG2Ptr "\xFF_IMSG2Ptr"
#define ILibDuktape_IMSG2SR "\xFF_IMSG2ServerResponse"
#define ILibDuktape_NS2HttpServer "\xFF_Http_NetServer2HttpServer"
#define ILibDuktape_Options2ClientRequest "\xFF_Options2ClientRequest"
@@ -52,16 +70,17 @@ extern void ILibWebClient_ResetWCDO(struct ILibWebClientDataObject *wcdo);
#define ILibDuktape_Socket2CR "\xFF_Socket2CR"
#define ILibDuktape_Socket2HttpServer "\xFF_Socket2HttpServer"
#define ILibDuktape_Socket2HttpStream "\xFF_Socket2HttpStream"
#define ILibDuktape_Socket2DiedListener "\xFF_Socket2DiedListener"
#define ILibDuktape_Socket2TLS "\xFF_Socket2TLS"
#define ILibDuktape_SR2HttpStream "\xFF_ServerResponse2HttpStream"
#define ILibDuktape_SR2ImplicitHeaders "\xFF_ServerResponse2ImplicitHeaders"
#define ILibDuktape_SR2State "\xFF_ServerResponse2State"
#define ILibDuktape_SRUSER "\xFF_SRUSER"
#define ILibDuktape_SR2WS "\xFF_Http_ServerResponse2WS"
#define ILibDuktape_TLS2CR "\xFF_TLS2CR"
#define ILibDuktape_WebSocket_Client ((void*)0x01)
#define ILibDuktape_WebSocket_Server ((void*)0x02)
#define ILibDuktape_WebSocket_StatePtr "\xFF_WebSocketState"
#define ILibDuktape_WSENC2WS "\xFF_WSENC2WS"
#define ILibDuktape_WS2CR "\xFF_WS2ClientRequest"
#define ILibDuktape_WSDEC2WS "\xFF_WSDEC2WS"
@@ -73,6 +92,8 @@ typedef struct ILibDuktape_Http_ClientRequest_WriteData
int noMoreWrites;
int headersFinished;
int contentLengthSpecified;
int needRetry;
int retryCounter;
size_t bufferWriteLen;
size_t bufferLen;
}ILibDuktape_Http_ClientRequest_WriteData;
@@ -90,6 +111,7 @@ typedef struct ILibDuktape_HttpStream_Data
void *WCDO;
void *chain;
int ConnectMethod;
int endPropagated;
}ILibDuktape_HttpStream_Data;
typedef struct ILibDuktape_HttpStream_ServerResponse_State
@@ -129,6 +151,9 @@ typedef struct ILibDuktape_WebSocket_State
void *ObjectPtr; // Used to emit Ping/Pong events
duk_context *ctx; // Used to emit Ping/Pong events
int noResume;
int closed;
ILibDuktape_DuplexStream *encodedStream;
ILibDuktape_DuplexStream *decodedStream;
}ILibDuktape_WebSocket_State;
@@ -337,10 +362,10 @@ void ILibDuktape_HttpStream_http_ConvertOptionToSend(duk_context *ctx, void *Obj
while (duk_next(ctx, -1, 1))
{
tmp = (char*)duk_get_lstring(ctx, -2, &len);
if (buffer != NULL) { memcpy_s(buffer + bufferLen, ILibMemory_AllocateA_Size(buffer) - bufferLen, tmp, len); (buffer + bufferLen)[len] = ':'; }
if (buffer != NULL) { memcpy_s(buffer + bufferLen, ILibMemory_AllocateA_Size(buffer) - bufferLen, tmp, len); (buffer + bufferLen)[len] = ':'; (buffer + bufferLen)[len + 1] = ' '; }
if (len == 6 && strncasecmp(tmp, "expect", 6) == 0) { expectSpecified = 1; }
if (len == 14 && strncasecmp(tmp, "content-length", 14) == 0) { data->contentLengthSpecified = 1; }
bufferLen += (len + 1); // ('key:')
bufferLen += (len + 2); // ('key: ')
tmp = (char*)duk_get_lstring(ctx, -1, &len);
if (buffer != NULL) { memcpy_s(buffer + bufferLen, ILibMemory_AllocateA_Size(buffer) - bufferLen, tmp, len); (buffer + bufferLen)[len] = '\r'; (buffer + bufferLen)[len + 1] = '\n'; }
bufferLen += (len + 2); // ('value\r\n')
@@ -382,41 +407,21 @@ void ILibDuktape_HttpStream_http_ConvertOptionToSend(duk_context *ctx, void *Obj
duk_pop_n(ctx, 4); // ...
}
duk_ret_t ILibDuktape_HttpStream_http_OnTLSConnect(duk_context *ctx)
duk_ret_t ILibDuktape_HttpStream_http_WebSocket_closed(duk_context *ctx)
{
duk_push_this(ctx); // [TLS]
// ClientRequest Options => DecryptedTransform
duk_get_prop_string(ctx, -1, ILibDuktape_TLS2CR); // [TLS][ClientRequest]
duk_get_prop_string(ctx, -2, "clear"); // [TLS][ClientRequest][DecryptedTransform]
duk_get_prop_string(ctx, -2, ILibDuktape_CR2Options); // [TLS][ClientRequest][DecryptedTransform][Options]
ILibDuktape_HttpStream_http_ConvertOptionToSend(ctx, duk_get_heapptr(ctx, -2), duk_get_heapptr(ctx, -1));
// ClientRequest => DecryptedTransform
duk_pop(ctx); // [TLS][ClientRequest][DecryptedTransform]
duk_get_prop_string(ctx, -2, "pipe"); // [TLS][ClientRequest][DecryptedTransform][pipe]
duk_dup(ctx, -3); // [TLS][ClientRequest][DecryptedTransform][pipe][this]
duk_dup(ctx, -3); // [TLS][ClientRequest][DecryptedTransform][pipe][this][DecryptedTransform]
duk_push_object(ctx); // [TLS][ClientRequest][DecryptedTransform][pipe][this][DecryptedTransform][options]
duk_push_false(ctx); duk_put_prop_string(ctx, -2, "end");
if (duk_pcall_method(ctx, 2) != 0) { return(ILibDuktape_Error(ctx, "http.onTlsConnect() => Error calling pipe on Transform Stream: %s", duk_safe_to_string(ctx, -1))); }
duk_pop_2(ctx); // [TLS][ClientRequest]
// TLS EncryptedTransform => HTTP Stream
duk_get_prop_string(ctx, -2, "encrypted"); // [TLS][ClientRequest][EncryptedTransform]
duk_get_prop_string(ctx, -1, "pipe"); // [TLS][ClientRequest][EncryptedTransform][pipe]
duk_swap_top(ctx, -2); // [TLS][ClientRequest][pipe][this]
duk_get_prop_string(ctx, -3, ILibDuktape_CR2HTTPStream);// [TLS][ClientRequest][pipe][this][http]
if (duk_pcall_method(ctx, 1) != 0) { return(ILibDuktape_Error(ctx, "http.onTlsConnect() => Error calling pipe: %s", duk_safe_to_string(ctx, -1))); }
duk_pop(ctx); // [TLS][ClientRequest]
duk_get_prop_string(ctx, -1, ILibDuktape_CR2HTTPStream);// [TLS][ClientRequest][http]
duk_get_prop_string(ctx, -1, "pipe"); // [TLS][ClientRequest][http][pipe]
duk_swap_top(ctx, -2); // [TLS][ClientRequest][pipe][this]
duk_get_prop_string(ctx, -4, "clear"); // [TLS][ClientRequest][http][this][decryptedTransform]
if (duk_pcall_method(ctx, 1) != 0) { return(ILibDuktape_Error(ctx, "http.onTlsConnect() => Error calling pipe: %s", duk_safe_to_string(ctx, -1))); }
duk_push_this(ctx); // [socket]
if (duk_has_prop_string(ctx, -1, ILibDuktape_Socket2CR))
{
duk_get_prop_string(ctx, -1, ILibDuktape_Socket2CR); // [socket][CR]
duk_push_undefined(ctx); // [socket][CR][undefined]
ILibDuktape_CreateReadonlyProperty(ctx, "socket"); // [socket][CR]
duk_pop(ctx); // [socket]
duk_del_prop_string(ctx, -1, ILibDuktape_Socket2CR);
}
duk_get_prop_string(ctx, -1, "unpipe"); // [socket][unpipe]
duk_swap_top(ctx, -2); // [unpipe][this]
duk_call_method(ctx, 0);
return(0);
}
duk_ret_t ILibDuktape_HttpStream_http_onUpgrade(duk_context *ctx)
@@ -436,11 +441,27 @@ duk_ret_t ILibDuktape_HttpStream_http_onUpgrade(duk_context *ctx)
// We were upgraded to WebSocket, so we need to create a WebSocket Stream, detach the HTTPStream, and emit the event
// Upstream Readable => X => HttpStream
duk_push_this(ctx); // [HTTPStream]
if (duk_has_prop_string(ctx, -1, ILibDuktape_HTTP2CR))
{
duk_get_prop_string(ctx, -1, ILibDuktape_HTTP2CR); // [HTTPStream][CR]
duk_del_prop_string(ctx, -1, ILibDuktape_CR2HTTPStream);
duk_pop(ctx); // [HTTPStream]
}
duk_get_prop_string(ctx, -1, ILibDuktape_HTTP2PipedReadable); // [HTTPStream][readable]
if (duk_has_prop_string(ctx, -1, ILibDuktape_Socket2HttpStream))
{
duk_del_prop_string(ctx, -1, ILibDuktape_Socket2HttpStream);
}
duk_get_prop_string(ctx, -1, "unpipe"); // [HTTPStream][readable][unpipe]
duk_dup(ctx, -2); // [HTTPStream][readable][unpipe][this]
duk_call_method(ctx, 0); // [HTTPStream][readable][...]
duk_pop(ctx); // [HTTPStream][readable]
duk_get_prop_string(ctx, -1, "prependOnceListener"); // [HTTPStream][readable][prepend]
duk_dup(ctx, -2); // [HTTPStream][readable][prepend][this]
duk_push_string(ctx, "close"); // [HTTPStream][readable][prepend][this]['close']
duk_push_c_function(ctx, ILibDuktape_HttpStream_http_WebSocket_closed, DUK_VARARGS);
duk_call_method(ctx, 2); duk_pop(ctx); // [HTTPStream][readable]
duk_push_external_buffer(ctx); // [HTTPStream][readable][ext]
duk_config_buffer(ctx, -1, decodedKey, decodedKeyLen);
@@ -453,8 +474,8 @@ duk_ret_t ILibDuktape_HttpStream_http_onUpgrade(duk_context *ctx)
duk_remove(ctx, -2); // [HTTPStream][readable][websocket]
duk_get_prop_string(ctx, -3, ILibDuktape_HTTP2CR); // [HTTPStream][readable][websocket][clientRequest]
duk_dup(ctx, -2); // [HTTPStream][readable][websocket][clientRequest][websocket]
duk_put_prop_string(ctx, -2, ILibDuktape_CR2WS); // [HTTPStream][readable][websocket][clientRequest]
//duk_dup(ctx, -2); // [HTTPStream][readable][websocket][clientRequest][websocket]
//duk_put_prop_string(ctx, -2, ILibDuktape_CR2WS); // [HTTPStream][readable][websocket][clientRequest]
duk_put_prop_string(ctx, -2, ILibDuktape_WS2CR); // [HTTPStream][readable][websocket]
// Upstream Readable => WebSocket Encoded
@@ -475,9 +496,10 @@ duk_ret_t ILibDuktape_HttpStream_http_onUpgrade(duk_context *ctx)
duk_call_method(ctx, 1); // [HTTPStream][websocket][...]
duk_pop(ctx); // [HTTPStream][websocket]
}
duk_get_prop_string(ctx, -1, ILibDuktape_WS2CR); // [HTTPStream][websocket][clientRequest]
duk_get_prop_string(ctx, -1, "emit"); // [HTTPStream][websocket][clientRequest][emit]
duk_swap_top(ctx, -2); // [HTTPStream][websocket][emit][this]
duk_push_string(ctx, "upgrade"); // [HTTPStream][websocket][emit][this][upgrade]
duk_dup(ctx, 0); // [HTTPStream][websocket][emit][this][upgrade][imsg]
@@ -486,7 +508,7 @@ duk_ret_t ILibDuktape_HttpStream_http_onUpgrade(duk_context *ctx)
duk_remove(ctx, -2); // [HTTPStream][websocket][emit][this][upgrade][imsg][WS_DEC]
duk_push_null(ctx); // [HTTPStream][websocket][emit][this][upgrade][imsg][WS_DEC][null]
duk_call_method(ctx, 4); duk_pop(ctx); // [HTTPStream][websocket]
return(0);
}
duk_ret_t ILibDuktape_HttpStream_http_endResponseSink(duk_context *ctx)
@@ -494,21 +516,34 @@ duk_ret_t ILibDuktape_HttpStream_http_endResponseSink(duk_context *ctx)
duk_push_this(ctx); // [imsg]
duk_get_prop_string(ctx, -1, ILibDuktape_IMSG2HttpStream); // [imsg][httpstream]
duk_get_prop_string(ctx, -1, ILibDuktape_HTTP2CR); // [imsg][httpstream][CR]
duk_del_prop_string(ctx, -3, ILibDuktape_IMSG2HttpStream);
duk_del_prop_string(ctx, -2, ILibDuktape_HTTP2CR);
duk_del_prop_string(ctx, -1, ILibDuktape_CR2HTTPStream);
duk_get_prop_string(ctx, -1, "unpipe"); // [imsg][httpstream][CR][unpipe]
duk_dup(ctx, -2); // [imsg][httpstream][CR][unpipe][this]
duk_call_method(ctx, 0); duk_pop(ctx); // [imsg][httpstream][CR]
duk_get_prop_string(ctx, -1, "socket"); // [imsg][httpstream][CR][socket]
duk_insert(ctx, -4); // [socket][imsg][httpstream][CR]
duk_push_undefined(ctx); // [socket][imsg][httpstream][CR][undefined]
ILibDuktape_CreateReadonlyProperty(ctx, "socket"); // [socket][imsg][httpstream][CR]
if (Duktape_GetBooleanProperty(ctx, -2, "connectionCloseSpecified", 0) != 0)
{
// We cant persist this connection, so close the socket.
// Agent is already listening for the 'close' event, so it'll cleanup automatically
duk_get_prop_string(ctx, -1, "socket"); // [imsg][httpstream][CR][socket]
duk_get_prop_string(ctx, -1, "end"); // [imsg][httpstream][CR][socket][end]
duk_swap_top(ctx, -2); // [imsg][httpstream][CR][end][this]
duk_dup(ctx, -4); // [socket][imsg][httpstream][CR][socket]
duk_get_prop_string(ctx, -1, "end"); // [socket][imsg][httpstream][CR][socket][end]
duk_swap_top(ctx, -2); // [socket][imsg][httpstream][CR][end][this]
duk_call_method(ctx, 0);
return(0);
}
duk_get_prop_string(ctx, -1, ILibDuktape_CR2Agent); // [imsg][httpstream][CR][Agent]
duk_get_prop_string(ctx, -1, "keepSocketAlive"); // [imsg][httpstream][CR][Agent][keepSocketAlive]
duk_swap_top(ctx, -2); // [imsg][httpstream][CR][keepSocketAlive][this]
duk_get_prop_string(ctx, -3, "socket"); // [imsg][httpstream][CR][keepSocketAlive][this][socket]
duk_call_method(ctx, 1); duk_pop(ctx); // [imsg][httpstream][CR]
duk_get_prop_string(ctx, -1, ILibDuktape_CR2Agent); // [socket][imsg][httpstream][CR][Agent]
duk_get_prop_string(ctx, -1, "keepSocketAlive"); // [socket][imsg][httpstream][CR][Agent][keepSocketAlive]
duk_swap_top(ctx, -2); // [socket][imsg][httpstream][CR][keepSocketAlive][this]
duk_dup(ctx, -6); // [socket][imsg][httpstream][CR][keepSocketAlive][this][socket]
duk_call_method(ctx, 1); duk_pop(ctx); // [socket][imsg][httpstream][CR]
return(0);
}
duk_ret_t ILibDuktape_HttpStream_http_responseSink(duk_context *ctx)
@@ -517,8 +552,92 @@ duk_ret_t ILibDuktape_HttpStream_http_responseSink(duk_context *ctx)
duk_dup(ctx, 0); // [httpstream][imsg]
duk_swap_top(ctx, -2); // [imsg][httpstream]
duk_put_prop_string(ctx, -2, ILibDuktape_IMSG2HttpStream); // [imsg]
ILibDuktape_EventEmitter_AddOnceEx3(ctx, 0, "end", ILibDuktape_HttpStream_http_endResponseSink);
duk_pop(ctx);
return(0);
}
duk_ret_t ILibDuktape_HttpStream_http_SocketDiedPrematurely(duk_context *ctx)
{
duk_push_this(ctx); // [socket]
duk_get_prop_string(ctx, -1, ILibDuktape_Socket2CR); // [socket][clientRequest]
ILibDuktape_Transform *tf = (ILibDuktape_Transform*)Duktape_GetPointerProperty(ctx, -1, ILibDuktape_CR2Transform);
if (tf->target->resumeImmediate != NULL)
{
duk_push_global_object(ctx); // [g]
duk_get_prop_string(ctx, -1, "clearImmediate"); // [g][clearImmediate]
duk_swap_top(ctx, -2); // [clearImmediate][this]
duk_push_heapptr(ctx, tf->target->resumeImmediate); // [clearImmediate][this][immedate]
duk_call_method(ctx, 1); duk_pop(ctx); // ...
tf->target->resumeImmediate = NULL;
}
duk_get_prop_string(ctx, -1, "unpipe"); // [socket][clientRequest][unpipe]
duk_dup(ctx, -2); // [socket][clientRequest][unpipe][this]
duk_call_method(ctx, 0); duk_pop(ctx); // [socket][clientRequest]
ILibDuktape_ReadableStream_DestroyPausedData(tf->target);
// Need to specify some stuff, so the request body will go out again
duk_get_prop_string(ctx, -1, ILibDuktape_CR_RequestBuffer); // [socket][clientRequest][buffer]
ILibDuktape_Http_ClientRequest_WriteData *wdata = (ILibDuktape_Http_ClientRequest_WriteData*)Duktape_GetBuffer(ctx, -1, NULL);
++wdata->retryCounter;
wdata->needRetry = 1;
wdata->bufferWriteLen = wdata->bufferLen;
wdata->headersFinished = 0;
duk_pop(ctx); // [socket][clientRequest]
if (wdata->retryCounter < 3)
{
duk_get_prop_string(ctx, -1, ILibDuktape_CR2Agent); // [socket][clientRequest][agent]
duk_get_prop_string(ctx, -1, "requests"); // [socket][clientReqeust][agent][requests]
duk_get_prop_string(ctx, -4, ILibDuktape_Socket2AgentKey); // [socket][clientRequest][agent][requests][key]
duk_get_prop(ctx, -2); // [socket][clientRequest][agent][requests][array]
if (!duk_is_undefined(ctx, -1))
{
// We need to prepend the clientRequest into the request Queue
duk_get_prop_string(ctx, -1, "unshift"); // [socket][clientRequest][agent][requests][array][unshift]
duk_swap_top(ctx, -2); // [socket][clientRequest][agent][requests][unshift][this]
duk_dup(ctx, -5); // [socket][clientRequest][agent][requests][unshift][this][clientRequest]
duk_call_method(ctx, 1);
}
}
else
{
ILibDuktape_EventEmitter_SetupEmit(ctx, duk_get_heapptr(ctx, -1), "error"); // [emit][this][error]
duk_push_error_object(ctx, DUK_ERR_ERROR, "Too many failed attempts"); // [emit][this][error][err]
duk_call_method(ctx, 2);
}
return(0);
}
duk_ret_t ILibDuktape_HttpStream_http_SocketResponseReceived(duk_context *ctx)
{
duk_push_this(ctx); // [httpStream]
duk_get_prop_string(ctx, -1, ILibDuktape_HTTPStream2Socket); // [httpStream][socket]
duk_get_prop_string(ctx, -1, ILibDuktape_Socket2CR); // [httpStream][socket][CR]
duk_get_prop_string(ctx, -1, ILibDuktape_CR2Options); // [httpStream][socket][CR][Options]
duk_del_prop_string(ctx, -1, ILibDuktape_Options2ClientRequest);
duk_pop_2(ctx); // [httpStream][socket]
duk_del_prop_string(ctx, -1, ILibDuktape_Socket2CR);
duk_get_prop_string(ctx, -1, "removeListener"); // [httpStream][socket][removeListener]
duk_swap_top(ctx, -2); // [httpStream][removeListener][this]
duk_push_string(ctx, "close"); // [httpStream][removeListener][this][close]
duk_get_prop_string(ctx, -2, ILibDuktape_Socket2DiedListener); // [httpStream][removeListener][this][close][listener]
duk_call_method(ctx, 2);
return(0);
}
duk_ret_t ILibDuktape_HttpStream_http_OnSocketClosed(duk_context *ctx)
{
duk_push_this(ctx); // [socket]
if (duk_has_prop_string(ctx, -1, ILibDuktape_Socket2HttpStream))
{
duk_get_prop_string(ctx, -1, ILibDuktape_Socket2HttpStream); // [socket][stream]
duk_pop(ctx); // [socket]
duk_del_prop_string(ctx, -1, ILibDuktape_Socket2HttpStream);
}
duk_pop(ctx); // ...
return(0);
}
duk_ret_t ILibDuktape_HttpStream_http_OnSocketReady(duk_context *ctx)
@@ -526,17 +645,54 @@ duk_ret_t ILibDuktape_HttpStream_http_OnSocketReady(duk_context *ctx)
void *httpStream;
duk_dup(ctx, 0); // [socket]
duk_push_c_function(ctx, ILibDuktape_HttpStream_http_SocketDiedPrematurely, DUK_VARARGS);
duk_put_prop_string(ctx, -2, ILibDuktape_Socket2DiedListener); // [socket]
duk_push_this(ctx); // [socket][clientRequest]
// Register ourselves for the close event, becuase we'll need to put ourselves back in the Queue if the socket dies before we are done
duk_get_prop_string(ctx, -2, "prependOnceListener"); // [socket][clientRequest][prependOnce]
duk_dup(ctx, -3); // [socket][clientRequest][prependOnce][this]
duk_push_string(ctx, "close"); // [socket][clientRequest][prependOnce][this][close]
duk_get_prop_string(ctx, -5, ILibDuktape_Socket2DiedListener); // [socket][clientRequest][prependOnce][this][close][listener]
duk_call_method(ctx, 2); duk_pop(ctx); // [socket][clientRequest]
duk_put_prop_string(ctx, -2, ILibDuktape_Socket2CR); // [socket]
duk_push_this(ctx); // [socket][clientRequest]
if (duk_has_prop_string(ctx, -2, ILibDuktape_Socket2HttpStream))
{
// HTTP and/or TLS was already setup previously
duk_get_prop_string(ctx, -2, ILibDuktape_Socket2HttpStream); // [socket][clientRequest][HTTPStream]
ILibDuktape_EventEmitter_AddOnceEx3(ctx, -1, "write", ILibDuktape_HttpStream_http_SocketResponseReceived);
duk_get_prop_string(ctx, -1, ILibDuktape_HTTPStream2Data); // [socket][clientRequest][HTTPStream][data]
ILibDuktape_HttpStream_Data *data = (ILibDuktape_HttpStream_Data*)Duktape_GetBuffer(ctx, -1, NULL);
ILibWebClient_ResetWCDO(data->WCDO);
if (data->bodyStream != NULL) { ILibDuktape_readableStream_WriteEnd(data->bodyStream); data->bodyStream = NULL; }
duk_pop(ctx); // [socket][clientRequest][HTTPStream]
// We need to change the events to propagate to the new clientRequest instead of the old one
duk_get_prop_string(ctx, -1, "removeAllListeners"); // [socket][clientRequest][HTTPStream][remove]
duk_dup(ctx, -2); // [socket][clientRequest][HTTPStream][remove][this]
duk_push_string(ctx, "response"); // [socket][clientRequest][HTTPStream][remove][this][response]
duk_call_method(ctx, 1); duk_pop(ctx); // [socket][clientRequest][HTTPStream]
duk_get_prop_string(ctx, -1, "removeAllListeners"); // [socket][clientRequest][HTTPStream][remove]
duk_dup(ctx, -2); // [socket][clientRequest][HTTPStream][remove][this]
duk_push_string(ctx, "continue"); // [socket][clientRequest][HTTPStream][remove][this][continue]
duk_call_method(ctx, 1); duk_pop(ctx); // [socket][clientRequest][HTTPStream]
duk_get_prop_string(ctx, -1, "removeAllListeners"); // [socket][clientRequest][HTTPStream][remove]
duk_dup(ctx, -2); // [socket][clientRequest][HTTPStream][remove][this]
duk_push_string(ctx, "upgrade"); // [socket][clientRequest][HTTPStream][remove][this][upgrade]
duk_call_method(ctx, 1); duk_pop(ctx); // [socket][clientRequest][HTTPStream]
duk_push_this(ctx); // [socket][clientRequest][HTTPStream][clientRequest]
duk_put_prop_string(ctx, -2, ILibDuktape_HTTP2CR); // [socket][clientRequest][HTTPStream]
ILibDuktape_EventEmitter_ForwardEvent(ctx, -1, "response", -2, "response");
ILibDuktape_EventEmitter_ForwardEvent(ctx, -1, "continue", -2, "continue");
ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "upgrade", ILibDuktape_HttpStream_http_onUpgrade);
ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "response", ILibDuktape_HttpStream_http_responseSink);
duk_get_prop_string(ctx, -1, ILibDuktape_HTTP2PipedWritable); // [socket][clientRequest][HTTPStream][destination]
duk_get_prop_string(ctx, -3, ILibDuktape_CR2Options); // [socket][clientRequest][HTTPStream][destination][Options]
ILibDuktape_HttpStream_http_ConvertOptionToSend(ctx, duk_get_heapptr(ctx, -2), duk_get_heapptr(ctx, -1));
@@ -573,13 +729,14 @@ duk_ret_t ILibDuktape_HttpStream_http_OnSocketReady(duk_context *ctx)
ILibDuktape_EventEmitter_ForwardEvent(ctx, -1, "continue", -2, "continue");
ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "upgrade", ILibDuktape_HttpStream_http_onUpgrade);
ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "response", ILibDuktape_HttpStream_http_responseSink);
ILibDuktape_EventEmitter_AddOnceEx3(ctx, -1, "write", ILibDuktape_HttpStream_http_SocketResponseReceived);
ILibDuktape_EventEmitter_AddOnceEx3(ctx, -3, "close", ILibDuktape_HttpStream_http_OnSocketClosed); // We need to detach HttpStream when socket closes
duk_put_prop_string(ctx, -2, ILibDuktape_CR2HTTPStream); // [socket][clientRequest]
duk_get_prop_string(ctx, -1, ILibDuktape_CR2Options); // [socket][clientRequest][options]
ILibDuktape_HttpStream_http_ConvertOptionToSend(ctx, duk_get_heapptr(ctx, -3), duk_get_heapptr(ctx, -1));
duk_pop(ctx); // [socket][clientRequest]
// ClientRequest => Socket
duk_get_prop_string(ctx, -1, "pipe"); // [socket][clientRequest][pipe]
duk_swap_top(ctx, -2); // [socket][pipe][this]
@@ -589,6 +746,12 @@ duk_ret_t ILibDuktape_HttpStream_http_OnSocketReady(duk_context *ctx)
if (duk_pcall_method(ctx, 2) != 0) { return(ILibDuktape_Error(ctx, "http.onConnect(): Error Piping with socket ")); }
duk_pop(ctx); // [socket]
// Save this value, so we can unregister 'close' from socket later
duk_push_heapptr(ctx, httpStream); // [socket][httpStream]
duk_dup(ctx, -2); // [socket][httpStream][socket]
duk_put_prop_string(ctx, -2, ILibDuktape_HTTPStream2Socket); // [socket][httpStream]
duk_pop(ctx); // [socket]
// Socket => HttpStream
duk_get_prop_string(ctx, -1, "pipe"); // [socket][pipe]
duk_dup(ctx, -2); // [socket][pipe][this]
@@ -601,8 +764,8 @@ duk_ret_t ILibDuktape_HttpStream_http_OnSocketReady(duk_context *ctx)
duk_get_prop_string(ctx, -1, "pipe"); // [socket][http][pipe]
duk_swap_top(ctx, -2); // [socket][pipe][this]
duk_dup(ctx, -3); // [socket][pipe][this][socket]
if (duk_pcall_method(ctx, 1) != 0) { return(ILibDuktape_Error(ctx, "http.onConnect(): Error calling pipe ")); }
if (duk_pcall_method(ctx, 1) != 0) { return(ILibDuktape_Error(ctx, "http.onConnect(): Error calling pipe ")); }
return(0);
}
duk_ret_t ILibDuktape_HttpStream_http_OnConnectError(duk_context *ctx)
@@ -708,37 +871,7 @@ duk_ret_t ILibDuktape_HttpStream_http_OnConnect(duk_context *ctx)
{
duk_ret_t retVal = 0;
duk_push_this(ctx); // [socket]
duk_get_prop_string(ctx, -1, ILibDuktape_SOCKET2OPTIONS); // [socket][options]
//if (duk_has_prop_string(ctx, -1, "proxy"))
//{
// duk_get_prop_string(ctx, -1, "proxy"); // [socket][options][proxy]
// duk_size_t remoteHostLen;
// char *remoteHost = (char*)Duktape_GetStringPropertyValueEx(ctx, -1, "remoteHost", NULL, &remoteHostLen);
// int remotePort = Duktape_GetIntPropertyValue(ctx, -1, "remotePort", 0);
// if (remoteHost != NULL && remotePort != 0)
// {
// duk_pop_2(ctx); // [socket]
// ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "data", ILibDuktape_HttpStream_http_proxyData);
// duk_get_prop_string(ctx, -1, "write"); // [socket][write]
// duk_swap_top(ctx, -2); // [write][this]
// char *tmp = (char*)ILibMemory_AllocateA((2 * remoteHostLen) + 72);
// sprintf_s(tmp, ILibMemory_AllocateA_Size(tmp), "CONNECT %s:%u HTTP/1.1\r\nProxy-Connection: keep-alive\r\nHost: %s\r\n\r\n", remoteHost, remotePort, remoteHost);
// duk_push_string(ctx, tmp); // [write][this][chunk]
// duk_call_method(ctx, 1);
// return(0);
// }
// else
// {
// duk_pop(ctx); // [socket][options]
// }
//}
duk_pop(ctx); // [socket]
if (duk_has_prop_string(ctx, -1, ILibDuktape_Socket2CR))
{
// Socket was created with passed in createConnection
@@ -781,7 +914,7 @@ void ILibDuktape_HttpStream_http_request_transformPiped(struct ILibDuktape_Trans
ILibDuktape_readableStream_WriteData(sender->target, tmp, tmpLen);
if (data->bufferWriteLen > 0) { ILibDuktape_readableStream_WriteData(sender->target, data->buffer, (int)data->bufferWriteLen); }
}
else if(data->bufferWriteLen > 0)
else if(data->needRetry != 0)
{
if (data->headersFinished)
{
@@ -790,15 +923,27 @@ void ILibDuktape_HttpStream_http_request_transformPiped(struct ILibDuktape_Trans
else
{
data->headersFinished = 1;
tmpLen = sprintf_s(tmp, sizeof(tmp), "Transfer-Encoding: chunked\r\n\r\n%X\r\n", (unsigned int)data->bufferWriteLen);
if (data->contentLengthSpecified)
{
tmpLen = sprintf_s(tmp, sizeof(tmp), "Content-Length: %d\r\n\r\n", (int)data->bufferWriteLen);
}
else
{
tmpLen = sprintf_s(tmp, sizeof(tmp), "Transfer-Encoding: chunked\r\n\r\n%X\r\n", (unsigned int)data->bufferWriteLen);
}
}
ILibDuktape_readableStream_WriteData(sender->target, tmp, tmpLen);
ILibDuktape_readableStream_WriteData(sender->target, data->buffer, (int)data->bufferWriteLen);
ILibDuktape_readableStream_WriteData(sender->target, "\r\n", 2);
free(data->buffer);
data->buffer = NULL;
if (data->bufferWriteLen > 0)
{
ILibDuktape_readableStream_WriteData(sender->target, data->buffer, (int)data->bufferWriteLen);
if (!data->contentLengthSpecified) { ILibDuktape_readableStream_WriteData(sender->target, "\r\n", 2); }
free(data->buffer);
data->buffer = NULL;
}
data->bufferLen = data->bufferWriteLen = 0;
data->needRetry = 0;
}
}
void ILibDuktape_HttpStream_http_request_transform(struct ILibDuktape_Transform *sender, int Reserved, int flush, char *buffer, int bufferLen, void *user)
@@ -820,6 +965,13 @@ void ILibDuktape_HttpStream_http_request_transform(struct ILibDuktape_Transform
{
ILibDuktape_readableStream_WriteData(sender->target, buffer, bufferLen);
}
data->contentLengthSpecified = 1;
if (bufferLen > 0)
{
data->buffer = (char*)ILibMemory_Allocate(bufferLen, 0, NULL, NULL);
data->bufferLen = bufferLen;
memcpy_s(data->buffer, bufferLen, buffer, bufferLen);
}
}
else
{
@@ -844,11 +996,12 @@ void ILibDuktape_HttpStream_http_request_transform(struct ILibDuktape_Transform
}
}
duk_ret_t ILibDuktape_HttpStream_http_request(duk_context *ctx)
{
char *proto;
duk_size_t protoLen;
int isTLS = 0;
int nargs = duk_get_top(ctx);
if (duk_is_string(ctx, 0))
{
@@ -881,7 +1034,8 @@ duk_ret_t ILibDuktape_HttpStream_http_request(duk_context *ctx)
duk_put_prop_string(ctx, -2, "agent"); // [options][protocol][options]
duk_pop(ctx); // [options][protocol]
}
if ((protoLen == 4 && strncasecmp(proto, "wss:", 4) == 0) || (protoLen == 6 && strncasecmp(proto, "https:", 6) == 0)) { isTLS = 1; }
duk_pop(ctx); // [options]
if (!duk_has_prop_string(ctx, -1, "headers"))
{
@@ -912,15 +1066,16 @@ duk_ret_t ILibDuktape_HttpStream_http_request(duk_context *ctx)
duk_put_prop_string(ctx, -2, ILibDuktape_CR2HTTP); // [clientRequest]
duk_push_false(ctx);
duk_put_prop_string(ctx, -2, ILibDuktape_CR_EndCalled); // [clientRequest]
duk_push_fixed_buffer(ctx, sizeof(ILibDuktape_Http_ClientRequest_WriteData));
duk_put_prop_string(ctx, -2, ILibDuktape_CR_EndCalled); // [clientRequest]
duk_push_fixed_buffer(ctx, sizeof(ILibDuktape_Http_ClientRequest_WriteData)); // [clientRequest][buffer]
ILibDuktape_Http_ClientRequest_WriteData *wdata = (ILibDuktape_Http_ClientRequest_WriteData*)Duktape_GetBuffer(ctx, -1, NULL);
duk_put_prop_string(ctx, -2, ILibDuktape_CR_RequestBuffer);
duk_put_prop_string(ctx, -2, ILibDuktape_CR_RequestBuffer); // [clientRequest]
memset(wdata, 0, sizeof(ILibDuktape_Http_ClientRequest_WriteData));
ILibDuktape_Transform_Init(ctx, ILibDuktape_HttpStream_http_request_transform, ILibDuktape_HttpStream_http_request_transformPiped, wdata);
duk_push_pointer(ctx, ILibDuktape_Transform_Init(ctx, ILibDuktape_HttpStream_http_request_transform, ILibDuktape_HttpStream_http_request_transformPiped, wdata));
duk_put_prop_string(ctx, -2, ILibDuktape_CR2Transform); // [clientRequest]
ILibDuktape_WriteID(ctx, "https.clientRequest");
ILibDuktape_WriteID(ctx, isTLS ? "https.clientRequest" : "http.clientRequest");
ILibDuktape_EventEmitter *emitter = ILibDuktape_EventEmitter_Create(ctx);
ILibDuktape_EventEmitter_CreateEventEx(emitter, "abort");
ILibDuktape_EventEmitter_CreateEventEx(emitter, "connect");
@@ -930,8 +1085,18 @@ duk_ret_t ILibDuktape_HttpStream_http_request(duk_context *ctx)
ILibDuktape_EventEmitter_CreateEventEx(emitter, "timeout");
ILibDuktape_EventEmitter_CreateEventEx(emitter, "upgrade");
ILibDuktape_EventEmitter_CreateEventEx(emitter, "error");
ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "socket", ILibDuktape_HttpStream_http_OnSocketReady);
ILibDuktape_EventEmitter_AddOnceEx3(ctx, -1, "socket", ILibDuktape_HttpStream_http_OnSocketReady);
if (nargs > 1 && duk_is_function(ctx, 1))
{
duk_get_prop_string(ctx, -1, "once"); // [clientRequest][once]
duk_dup(ctx, -2); // [clientRequest][once][this]
duk_push_string(ctx, "response"); // [clientRequest][once][this][response]
duk_dup(ctx, 1); // [clientRequest][once][this][response][handler]
duk_call_method(ctx, 2); duk_pop(ctx); // [clientRequest]
}
duk_dup(ctx, 0); // [clientRequest][options]
duk_put_prop_string(ctx, -2, ILibDuktape_CR2Options); // [clientReqeust]
@@ -1185,7 +1350,6 @@ duk_ret_t ILibDuktape_HttpStream_http_server_upgradeWebsocket(duk_context *ctx)
duk_get_prop_string(ctx, -1, "webSocketStream"); // [http][constructor]
duk_push_lstring(ctx, keyResult, keyResultLen); // [http][constructor][key]
duk_new(ctx, 1); // [http][wss]
duk_push_this(ctx); // [http][wss][socket]
duk_get_prop_string(ctx, -1, "pipe"); // [http][wss][socket][pipe]
@@ -1311,7 +1475,8 @@ duk_ret_t ILibDuktape_HttpStream_http_server_onConnection(duk_context *ctx)
//ILibDuktape_EventEmitter_ForwardEvent(ctx, -2, "close", -1, "close");
ILibDuktape_EventEmitter_ForwardEvent(ctx, -2, "connect", -1, "connect");
ILibDuktape_EventEmitter_ForwardEvent(ctx, -2, "request", -1, "request");
ILibDuktape_EventEmitter_AddOnceEx3(ctx, -2, "upgrade", ILibDuktape_HttpStream_http_server_onUpgrade);
if (ILibDuktape_EventEmitter_HasListenersEx(ctx, -1, "upgrade") > 0) { ILibDuktape_EventEmitter_AddOnceEx3(ctx, -2, "upgrade", ILibDuktape_HttpStream_http_server_onUpgrade); }
duk_pop(ctx); // [NS][socket][pipe][this][httpStream]
duk_call_method(ctx, 1); duk_pop_2(ctx); // [NS]
@@ -1396,6 +1561,7 @@ duk_ret_t ILibDuktape_HttpStream_http_server_address(duk_context *ctx)
duk_call_method(ctx, 0); // [httpServer][result]
return(1);
}
duk_ret_t ILibDuktape_HttpStream_http_createServer(duk_context *ctx)
{
ILibDuktape_Http_Server *server;
@@ -1430,7 +1596,7 @@ duk_ret_t ILibDuktape_HttpStream_http_createServer(duk_context *ctx)
if (nargs > 0 && duk_is_function(ctx, 0))
{
ILibDuktape_EventEmitter_AddOn(emitter, "request", duk_require_heapptr(ctx, 1));
ILibDuktape_EventEmitter_AddOn(emitter, "request", duk_require_heapptr(ctx, 0));
}
ILibDuktape_CreateInstanceMethod(ctx, "close", ILibDuktape_HttpStream_http_server_close, DUK_VARARGS);
@@ -1451,18 +1617,17 @@ duk_ret_t ILibDuktape_HttpStream_http_createServer(duk_context *ctx)
duk_get_prop_string(ctx, -1, "createServer"); // [server][nettls][createServer]
duk_swap_top(ctx, -2); // [server][createServer][this]
if (nargs > 0 && duk_is_object(ctx, 0))
if (nargs > 0 && duk_is_object(ctx, 0) && !duk_is_function(ctx, 0))
{
// Options was specified
duk_dup(ctx, 0); // [server][createServer][this][options]
}
duk_push_c_function(ctx, ILibDuktape_HttpStream_http_server_onConnection, DUK_VARARGS);
duk_call_method(ctx, (nargs > 0 && duk_is_object(ctx, 0)) ? 2 : 1); // [server][netServer]
duk_call_method(ctx, (nargs > 0 && duk_is_object(ctx, 0) && !duk_is_function(ctx, 0)) ? 2 : 1); // [server][netServer]
duk_dup(ctx, -2); // [server][netServer][server]
duk_put_prop_string(ctx, -2, ILibDuktape_NS2HttpServer); // [server][netServer]
duk_put_prop_string(ctx, -2, ILibDuktape_Http_Server2NetServer); // [server]
return(1);
}
@@ -1547,6 +1712,13 @@ ILibTransport_DoneState ILibDuktape_HttpStream_WriteSink(ILibDuktape_DuplexStrea
return(ILibTransport_DoneState_INCOMPLETE);
}
duk_push_heapptr(DS->readableStream->ctx, DS->ParentObject); // [httpStream]
duk_get_prop_string(DS->readableStream->ctx, -1, "emit"); // [httpStream][emit]
duk_swap_top(DS->readableStream->ctx, -2); // [emit][this]
duk_push_string(DS->readableStream->ctx, "write"); // [emit][this][write]
if (duk_pcall_method(DS->readableStream->ctx, 1) != 0) { ILibDuktape_Process_UncaughtExceptionEx(DS->readableStream->ctx, "httpStream.write(): Error dispatching 'write' event "); }
duk_pop(DS->readableStream->ctx); // ...
// We're already on Chain Thread, so we can just directly write
int beginPointer = 0;
int PAUSE = 0;
@@ -1571,7 +1743,7 @@ ILibTransport_DoneState ILibDuktape_HttpStream_WriteSink(ILibDuktape_DuplexStrea
duk_swap_top(stream->ctx, -2); // [extBuffer][unshift][this]
duk_push_buffer_object(stream->ctx, -3, 0, (int)bufferLen - beginPointer, DUK_BUFOBJ_NODEJS_BUFFER);// [extBuffer][unshift][this][buffer]
if (duk_pcall_method(stream->ctx, 1) != 0) { MustBuffer = 1; }
duk_pop(stream->ctx); // ...
duk_pop_2(stream->ctx); // ...
}
else
{
@@ -1631,9 +1803,10 @@ void ILibDuktape_HttpStream_EndSink(ILibDuktape_DuplexStream *stream, void *user
{
ILibDuktape_HttpStream_Data *data = (ILibDuktape_HttpStream_Data*)user;
if (data->bodyStream != NULL)
if (data->bodyStream != NULL && data->endPropagated == 0)
{
ILibDuktape_readableStream_WriteEnd(data->bodyStream);
data->endPropagated = 1;
}
}
void ILibDuktape_HttpStream_ServerResponse_WriteImplicitHeaders(void *chain, void *user)
@@ -1786,9 +1959,9 @@ ILibTransport_DoneState ILibDuktape_HttpStream_ServerResponse_WriteSink(struct I
tmp->writeStream = state->writeStream;
tmp->endBytes = stream->endBytes;
tmp->chunk = state->chunkSupported;
if (bufferLen > 0) { memcpy_s(tmp->buffer, bufferLen, buffer, bufferLen); }
if (bufferLen > 0) { memcpy_s(tmp->buffer, bufferLen, buffer, bufferLen); tmp->bufferLen = bufferLen; }
ILibDuktape_HttpStream_ServerResponse_WriteImplicitHeaders(NULL, &tmp);
ILibDuktape_HttpStream_ServerResponse_WriteImplicitHeaders(NULL, tmp);
return(ILibTransport_DoneState_COMPLETE);
}
else
@@ -2216,9 +2389,21 @@ void ILibDuktape_HttpStream_ServerResponse_PUSH(duk_context *ctx, void* writeStr
{
ILibDuktape_HttpStream_ServerResponse_State *state;
duk_push_object(ctx); // [resp]
duk_push_heapptr(ctx, httpStream); // [resp][httpStream]
duk_put_prop_string(ctx, -2, ILibDuktape_SR2HttpStream); // [resp]
duk_push_object(ctx); // [resp]
duk_push_heapptr(ctx, httpStream); // [resp][httpStream]
duk_dup(ctx, -1); // [resp][httpStream][dup]
duk_put_prop_string(ctx, -3, ILibDuktape_SR2HttpStream); // [resp][httpStream]
duk_get_prop_string(ctx, -1, ILibDuktape_HTTPStream2HTTP); // [resp][httpStream][http]
duk_get_prop_string(ctx, -1, ILibDuktape_OBJID); // [resp][httpStream][http][id]
duk_remove(ctx, -2); // [resp][httpStream][id]
duk_get_prop_string(ctx, -1, "concat"); // [resp][httpStream][id][concat]
duk_swap_top(ctx, -2); // [resp][httpStream][concat][this]
duk_push_string(ctx, ".serverResponse"); // [resp][httpStream][concat][this][serverResponse]
if (duk_pcall_method(ctx, 1) != 0) { duk_pop(ctx); duk_push_string(ctx, "http[s].serverResponse"); } // [resp][httpStream][http/s.serverResponse]
duk_remove(ctx, -2); // [resp][http/s.serverResponse]
duk_put_prop_string(ctx, -2, ILibDuktape_OBJID); // [resp]
ILibDuktape_PointerValidation_Init(ctx);
ILibDuktape_WriteID(ctx, "http.serverResponse");
duk_push_fixed_buffer(ctx, sizeof(ILibDuktape_HttpStream_ServerResponse_State)); // [resp][state]
@@ -2408,11 +2593,27 @@ duk_ret_t ILibDuktape_HttpStream_IncomingMessage_Digest_ValidatePassword(duk_con
duk_push_int(ctx, retVal);
return(1);
}
duk_ret_t ILibDuktape_HttpStream_IncomingMessage_finalizer(duk_context *ctx)
{
return(0);
}
void ILibDuktape_HttpStream_IncomingMessage_PUSH(duk_context *ctx, ILibHTTPPacket *header, void *httpstream)
{
duk_push_object(ctx); // [message]
ILibDuktape_CreateFinalizer(ctx, ILibDuktape_HttpStream_IncomingMessage_finalizer);
duk_push_heapptr(ctx, httpstream); // [message][httpStream]
duk_put_prop_string(ctx, -2, ILibDuktape_IMSG2HttpStream); // [message]
duk_dup(ctx, -1); // [message][httpStream][dup]
duk_put_prop_string(ctx, -3, ILibDuktape_IMSG2HttpStream); // [message][httpStream]
duk_get_prop_string(ctx, -1, ILibDuktape_HTTPStream2HTTP); // [message][httpStream][http]
duk_remove(ctx, -2); // [message][http]
duk_get_prop_string(ctx, -1, ILibDuktape_OBJID); // [message][http][id]
duk_get_prop_string(ctx, -1, "concat"); // [message][http][id][concat]
duk_swap_top(ctx, -2); // [message][http][concat][this]
duk_push_string(ctx, ".IncomingMessage"); // [message][http][concat][this][.IncomingMessage]
if (duk_pcall_method(ctx, 1) != 0) { duk_pop(ctx); duk_push_string(ctx, "http[s].IncomingMessage"); }
duk_remove(ctx, -2); // [message][http/s.IncomingMessage]
duk_put_prop_string(ctx, -2, ILibDuktape_OBJID); // [message]
duk_push_object(ctx); // [message][headers]
packetheader_field_node *node = header->FirstField;
while (node != NULL)
@@ -2479,7 +2680,26 @@ void ILibDuktape_HttpStream_DispatchEnd(void *chain, void *user)
}
free(user);
}
void ILibDuktape_HttpStream_ForceDisconnect(duk_context *ctx, void ** args, int argsLen)
{
duk_push_heapptr(ctx, args[0]);
duk_get_prop_string(ctx, -1, "end");
duk_swap_top(ctx, -2);
if (duk_pcall_method(ctx, 0) != 0) { ILibDuktape_Process_UncaughtExceptionEx(ctx, "httpStream.OnUpgrade(): "); }
duk_pop(ctx);
}
duk_ret_t ILibDuktape_HttpStream_OnReceive_bodyStreamFinalized(duk_context *ctx)
{
ILibDuktape_HttpStream_Data *data = (ILibDuktape_HttpStream_Data*)Duktape_GetPointerProperty(ctx, 0, ILibDuktape_IMSG2Ptr);
if (data != NULL)
{
if (data->endPropagated == 0) { ILibDuktape_readableStream_WriteEnd(data->bodyStream); }
data->endPropagated = 1;
data->bodyStream = NULL;
}
return(0);
}
void ILibDuktape_HttpStream_OnReceive(ILibWebClient_StateObject WebStateObject, int InterruptFlag, struct packetheader *header, char *bodyBuffer, int *beginPointer, int endPointer, ILibWebClient_ReceiveStatus recvStatus, void *user1, void *user2, int *PAUSE)
{
ILibDuktape_HttpStream_Data *data = (ILibDuktape_HttpStream_Data*)user1;
@@ -2525,7 +2745,22 @@ void ILibDuktape_HttpStream_OnReceive(ILibWebClient_StateObject WebStateObject,
ILibDuktape_HttpStream_IncomingMessage_PUSH(ctx, header, data->DS->ParentObject); // [emit][this][upgrade][imsg]
ILibDuktape_HttpStream_ServerResponse_PUSH(ctx, data->DS->writableStream->pipedReadable, header, data->DS->ParentObject); // [emit][this][request][imsg][rsp]
duk_put_prop_string(ctx, -2, ILibDuktape_IMSG2SR);
if (duk_pcall_method(ctx, 2) != 0) { ILibDuktape_Process_UncaughtExceptionEx(ctx, "http.httpStream.onReceive->upgrade(): "); }
if (duk_pcall_method(ctx, 2) != 0)
{ ILibDuktape_Process_UncaughtExceptionEx(ctx, "http.httpStream.onReceive->upgrade(): "); }
else
{
if (!duk_get_boolean(ctx, -1))
{
// No upgrade listener... Close connection
printf("\n\nNo Upgrade Listener\n");
void *imm = ILibDuktape_Immediate(ctx, (void*[]) { data->DS->writableStream->pipedReadable }, 1, ILibDuktape_HttpStream_ForceDisconnect);
duk_push_heapptr(ctx, imm);
duk_push_heapptr(ctx, data->DS->writableStream->pipedReadable);
duk_put_prop_string(ctx, -2, "r");
duk_pop_2(ctx);
return;
}
}
duk_pop(ctx); // ...
}
else
@@ -2604,15 +2839,27 @@ void ILibDuktape_HttpStream_OnReceive(ILibWebClient_StateObject WebStateObject,
case 101:
duk_push_string(ctx, "upgrade"); // [emit][this][upgrade]
ILibDuktape_HttpStream_IncomingMessage_PUSH(ctx, header, data->DS->ParentObject); // [emit][this][upgrade][imsg]
duk_del_prop_string(ctx, -1, ILibDuktape_IMSG2HttpStream);
duk_insert(ctx, -4); // [imsg][emit][this][upgrade]
duk_dup(ctx, -4); // [imsg][emit][this][upgrade][imsg]
if (duk_pcall_method(ctx, 2) != 0) { ILibDuktape_Process_UncaughtExceptionEx(ctx, "http.httpStream.onReceive->upgrade(): "); }
duk_pop(ctx);
duk_pop(ctx);
break;
default:
duk_push_string(ctx, "response"); // [emit][this][response]
ILibDuktape_HttpStream_IncomingMessage_PUSH(ctx, header, data->DS->ParentObject); // [emit][this][response][imsg]
data->bodyStream = ILibDuktape_ReadableStream_InitEx(ctx, ILibDuktape_HttpStream_IncomingMessage_PauseSink, ILibDuktape_HttpStream_IncomingMessage_ResumeSink, ILibDuktape_HttpStream_IncomingMessage_UnshiftBytes, data);
duk_push_pointer(ctx, data);
duk_put_prop_string(ctx, -2, ILibDuktape_IMSG2Ptr);
ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "~", ILibDuktape_HttpStream_OnReceive_bodyStreamFinalized);
if (duk_pcall_method(ctx, 2) != 0) { ILibDuktape_Process_UncaughtExceptionEx(ctx, "http.httpStream.onReceive->response(): "); }
duk_pop(ctx);
if (bodyBuffer != NULL && endPointer > 0)
{
ILibDuktape_readableStream_WriteData(data->bodyStream, bodyBuffer + *beginPointer, endPointer);
}
break;
}
}
@@ -2620,6 +2867,7 @@ void ILibDuktape_HttpStream_OnReceive(ILibWebClient_StateObject WebStateObject,
if (data->bodyStream != NULL && recvStatus == ILibWebClient_ReceiveStatus_Complete)
{
ILibDuktape_readableStream_WriteEnd(data->bodyStream);
data->endPropagated = 1;
}
if (recvStatus == ILibWebClient_ReceiveStatus_Complete)
{
@@ -2651,7 +2899,6 @@ duk_ret_t ILibDuktape_HttpStream_Finalizer(duk_context *ctx)
ILibDuktape_InValidateHeapPointer(ctx, 0);
ILibWebClient_DestroyWebClientDataObject(data->WCDO);
return(0);
}
@@ -2702,6 +2949,7 @@ duk_ret_t ILibduktape_HttpStream_create(duk_context *ctx)
ILibDuktape_EventEmitter_CreateEventEx(emitter, "clientError");
ILibDuktape_EventEmitter_CreateEventEx(emitter, "request");
ILibDuktape_EventEmitter_CreateEventEx(emitter, "connect");
ILibDuktape_EventEmitter_CreateEventEx(emitter, "write");
data->DS = ILibDuktape_DuplexStream_InitEx(ctx, ILibDuktape_HttpStream_WriteSink, ILibDuktape_HttpStream_EndSink,
NULL, NULL, NULL, data);
@@ -2772,7 +3020,7 @@ void ILibDuktape_RemoveObjFromTable(duk_context *ctx, duk_idx_t tableIdx, char *
duk_ret_t ILibDuktape_HttpStream_Agent_socketEndSink(duk_context *ctx)
{
duk_push_this(ctx); // [socket]
printf("socket has closed: %p\n", duk_get_heapptr(ctx, -1));
//printf("socket has closed: %p\n", duk_get_heapptr(ctx, -1));
duk_get_prop_string(ctx, -1, ILibDuktape_Socket2Agent); // [socket][agent]
duk_get_prop_string(ctx, -2, ILibDuktape_Socket2AgentKey); // [socket][agent][key]
char *key = Duktape_GetBuffer(ctx, -1, NULL);
@@ -2806,10 +3054,10 @@ duk_ret_t ILibDuktape_HttpStream_Agent_socketEndSink(duk_context *ctx)
duk_get_prop_string(ctx, -1, "createConnection"); // [socket][agent][agent][createConnection]
duk_swap_top(ctx, -2); // [socket][agent][createConnection][this]
duk_get_prop_string(ctx, -4, "\xFF_NET_SOCKET2OPTIONS"); // [socket][agent][createConnection][this][options]
duk_call_method(ctx, 1); // [socket][agent][newsocket]
duk_push_c_function(ctx, ILibDuktape_HttpStream_http_OnConnect, DUK_VARARGS); // We need to register here, because TLS/NonTLS have different event names
duk_call_method(ctx, 2); // [socket][agent][newsocket]
duk_swap_top(ctx, -2); // [socket][newsocket][agent]
duk_put_prop_string(ctx, -2, ILibDuktape_Socket2Agent); // [socket][newsocket]
ILibDuktape_EventEmitter_AddOnceEx3(ctx, -1, "connect", ILibDuktape_HttpStream_http_OnConnect);
ILibDuktape_EventEmitter_AddOnceEx3(ctx, -1, "error", ILibDuktape_HttpStream_http_OnConnectError);
}
}
@@ -2882,6 +3130,7 @@ duk_ret_t ILibDuktape_HttpStream_Agent_keepSocketAlive(duk_context *ctx)
duk_dup(ctx, -4); // [key][Agent][requests][request][reuseSocket][this][socket][request]
duk_call_method(ctx, 2); // [key][Agent][requests][request][retVal]
retVal = 1;
duk_pop(ctx); // [key][Agent][requests][request]
}
}
if (retVal == 0)
@@ -2917,6 +3166,11 @@ duk_ret_t ILibDuktape_HttpStream_Agent_keepSocketAlive(duk_context *ctx)
}
void ILibDuktape_HttpStream_Agent_reuseSocketEx(duk_context *ctx, void ** args, int argsLen)
{
duk_push_this(ctx); // [immediate]
duk_del_prop_string(ctx, -1, "CR");
duk_del_prop_string(ctx, -2, "Socket");
duk_pop(ctx); // ...
duk_push_heapptr(ctx, args[1]); // [clientRequest]
duk_push_heapptr(ctx, args[0]); // [clientRequest][socket]
@@ -2926,6 +3180,7 @@ void ILibDuktape_HttpStream_Agent_reuseSocketEx(duk_context *ctx, void ** args,
duk_call_method(ctx, 1); duk_pop(ctx); // [clientRequest][socket]
ILibDuktape_CreateReadonlyProperty(ctx, "socket"); // [clientRequest]
duk_get_prop_string(ctx, -1, "emit"); // [clientRequest][emit]
duk_swap_top(ctx, -2); // [emit][this]
duk_push_string(ctx, "socket"); // [emit][this][name]
@@ -2936,7 +3191,13 @@ void ILibDuktape_HttpStream_Agent_reuseSocketEx(duk_context *ctx, void ** args,
duk_ret_t ILibDuktape_HttpStream_Agent_reuseSocket(duk_context *ctx)
{
// Yield to the next loop, before we emit a 'socket' event, because emitting this event before anyone has the clientRequest object is pointless
ILibDuktape_Immediate(ctx, (void*[]) { duk_get_heapptr(ctx, 0), duk_get_heapptr(ctx, 1) }, 2, ILibDuktape_HttpStream_Agent_reuseSocketEx);
void *imm = ILibDuktape_Immediate(ctx, (void*[]) { duk_get_heapptr(ctx, 0), duk_get_heapptr(ctx, 1) }, 2, ILibDuktape_HttpStream_Agent_reuseSocketEx);
duk_push_heapptr(ctx, imm); // [immediate]
duk_dup(ctx, 1); // [immediate][ClientRequest]
duk_put_prop_string(ctx, -2, "CR"); // [immediate]
duk_dup(ctx, 0); // [immediate][Socket]
duk_put_prop_string(ctx, -2, "Socket"); // [immediate]
duk_pop(ctx);
return(0);
}
duk_ret_t ILibDuktape_HttpStream_Agent_createConnection_eventSink(duk_context *ctx)
@@ -3144,8 +3405,15 @@ ILibTransport_DoneState ILibDuktape_httpStream_webSocket_WriteWebSocketPacket(IL
if (ILibIsRunningOnChainThread(state->chain) != 0)
{
// We're on the Duktape Thread, so we can just call write multiple times, cuz we won't interleave with JavaScript
ILibDuktape_DuplexStream_WriteData(state->encodedStream, header, headerLen);
retVal = ILibDuktape_DuplexStream_WriteData(state->encodedStream, buffer, bufferLen) == 0 ? ILibTransport_DoneState_COMPLETE : ILibTransport_DoneState_INCOMPLETE;
if (bufferLen > 0)
{
ILibDuktape_DuplexStream_WriteData(state->encodedStream, header, headerLen);
retVal = ILibDuktape_DuplexStream_WriteData(state->encodedStream, buffer, bufferLen) == 0 ? ILibTransport_DoneState_COMPLETE : ILibTransport_DoneState_INCOMPLETE;
}
else
{
retVal = ILibDuktape_DuplexStream_WriteData(state->encodedStream, header, headerLen) == 0 ? ILibTransport_DoneState_COMPLETE : ILibTransport_DoneState_INCOMPLETE;
}
}
else
{
@@ -3314,6 +3582,7 @@ ILibTransport_DoneState ILibDuktape_httpStream_webSocket_EncodedWriteSink(ILibDu
switch (OPCODE)
{
case WEBSOCKET_OPCODE_CLOSE:
state->closed = 1;
ILibDuktape_DuplexStream_WriteEnd(state->decodedStream);
if (ILibIsRunningOnChainThread(state->chain) != 0 && state->encodedStream->writableStream->pipedReadable != NULL)
{
@@ -3363,7 +3632,7 @@ ILibTransport_DoneState ILibDuktape_httpStream_webSocket_EncodedWriteSink(ILibDu
void ILibDuktape_httpStream_webSocket_EncodedEndSink(ILibDuktape_DuplexStream *stream, void *user)
{
ILibDuktape_WebSocket_State *state = (ILibDuktape_WebSocket_State*)user;
ILibDuktape_DuplexStream_WriteEnd(state->decodedStream);
if (!state->closed) { ILibDuktape_DuplexStream_WriteEnd(state->decodedStream); }
}
void ILibDuktape_httpStream_webSocket_EncodedPauseSink_Chain(void *chain, void *user)
{
@@ -3453,6 +3722,14 @@ void ILibDuktape_httpStream_webSocket_DecodedPauseSink_Chain(void *chain, void *
ILibDuktape_WebSocket_State *state = (ILibDuktape_WebSocket_State*)user;
duk_context *ctx = state->encodedStream->writableStream->ctx;
if (state->encodedStream->writableStream->pipedReadable == NULL)
{
// We're not piped yet, so just set a flag, and we'll make sure we don't resume
state->noResume = 1;
return;
}
duk_push_heapptr(ctx, state->encodedStream->writableStream->pipedReadable); // [readable]
duk_get_prop_string(ctx, -1, "pause"); // [readable][pause]
duk_swap_top(ctx, -2); // [pause][this]
@@ -3485,6 +3762,12 @@ void ILibDuktape_httpStream_webSocket_DecodedResumeSink_Chain(void *chain, void
ILibDuktape_WebSocket_State *state = (ILibDuktape_WebSocket_State*)user;
duk_context *ctx = state->encodedStream->writableStream->ctx;
if (state->encodedStream->writableStream->pipedReadable == NULL)
{
state->noResume = 0;
return;
}
duk_push_heapptr(ctx, state->encodedStream->writableStream->pipedReadable); // [readable]
duk_get_prop_string(ctx, -1, "resume"); // [readable][resume]
duk_swap_top(ctx, -2); // [resume][this]
@@ -3521,6 +3804,17 @@ duk_ret_t ILibDuktape_httpStream_webSocketStream_finalizer(duk_context *ctx)
{
void *chain = Duktape_GetChain(ctx);
duk_get_prop_string(ctx, 0, ILibDuktape_WebSocket_StatePtr);
ILibDuktape_WebSocket_State *state = (ILibDuktape_WebSocket_State*)Duktape_GetBuffer(ctx, -1, NULL);
if (state->encodedStream->writableStream->pipedReadable != NULL)
{
duk_push_heapptr(ctx, state->encodedStream->writableStream->pipedReadable); // [readable]
duk_get_prop_string(ctx, -1, "unpipe"); // [readable][unpipe]
duk_swap_top(ctx, -2); // [unpipe][this]
duk_push_heapptr(ctx, state->encodedStream->writableStream->obj); // [unpipe][this][ws]
duk_call_method(ctx, 1); duk_pop(ctx); // ...
}
ILibDuktape_InValidatePointer(chain, Duktape_GetBuffer(ctx, -1, NULL));
return(0);
}
@@ -3544,6 +3838,24 @@ duk_ret_t ILibDuktape_httpStream_webSocketStream_sendPong(duk_context *ctx)
ILibDuktape_httpStream_webSocket_WriteWebSocketPacket(state, WEBSOCKET_OPCODE_PONG, NULL, 0, ILibWebClient_WebSocket_FragmentFlag_Complete);
return(0);
}
duk_ret_t ILibDuktape_httpStream_webSocketStream_encodedPiped(duk_context *ctx)
{
// Someone Piped to the Encoded Stream
duk_push_this(ctx); // [ENC]
duk_get_prop_string(ctx, -1, ILibDuktape_WSENC2WS); // [ENC][WS]
duk_get_prop_string(ctx, -1, ILibDuktape_WebSocket_StatePtr); // [ENC][WS][state]
ILibDuktape_WebSocket_State *state = (ILibDuktape_WebSocket_State*)Duktape_GetBuffer(ctx, -1, NULL);
if (state->noResume)
{
state->noResume = 0;
duk_push_heapptr(state->ctx, state->encodedStream->writableStream->pipedReadable); // [Readable]
duk_get_prop_string(state->ctx, -1, "pause"); // [Readable][pause]
duk_swap_top(ctx, -2); // [pause][this]
duk_call_method(ctx, 0);
}
return(0);
}
duk_ret_t ILibDuktape_httpStream_webSocketStream_new(duk_context *ctx)
{
@@ -3562,6 +3874,10 @@ duk_ret_t ILibDuktape_httpStream_webSocketStream_new(duk_context *ctx)
duk_push_object(ctx); // [WebSocket][Encoded]
ILibDuktape_WriteID(ctx, "http.WebSocketStream.encoded");
state->encodedStream = ILibDuktape_DuplexStream_InitEx(ctx, ILibDuktape_httpStream_webSocket_EncodedWriteSink, ILibDuktape_httpStream_webSocket_EncodedEndSink, ILibDuktape_httpStream_webSocket_EncodedPauseSink, ILibDuktape_httpStream_webSocket_EncodedResumeSink, ILibDuktape_httpStream_webSocket_EncodedUnshiftSink, state);
ILibDuktape_EventEmitter_AddOnEx(ctx, -1, "pipe", ILibDuktape_httpStream_webSocketStream_encodedPiped);
duk_dup(ctx, -2); // [WebSocket][Encoded][WebSocket]
duk_put_prop_string(ctx, -2, ILibDuktape_WSENC2WS); // [WebSocket][Encoded]
ILibDuktape_CreateReadonlyProperty(ctx, "encoded"); // [WebSocket]
duk_push_object(ctx); // [WebSocket][Decoded]
ILibDuktape_WriteID(ctx, "http.WebSocketStream.decoded");