Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -571,35 +571,22 @@ private async Task CancellationHelper(Task promise, CancellationToken cancellati
lock (_lockObject)
{
var state = State;
ForceReadCloseStatusLocked();
if (state == WebSocketState.Aborted)
{
ForceReadCloseStatusLocked();
throw new OperationCanceledException(nameof(WebSocketState.Aborted), ex);
}
if (ex is OperationCanceledException)
if (ex is OperationCanceledException || cancellationToken.IsCancellationRequested || ex.Message == "Error: OperationCanceledException")
{
if(state != WebSocketState.Closed)
{
FastState = WebSocketState.Aborted;
}
_cancelled = true;
throw;
}
if (state != WebSocketState.Closed && cancellationToken.IsCancellationRequested)
{
FastState = WebSocketState.Aborted;
_cancelled = true;
throw new OperationCanceledException(cancellationToken);
}
if (state != WebSocketState.Closed && ex.Message == "Error: OperationCanceledException")
{
FastState = WebSocketState.Aborted;
_cancelled = true;
throw new OperationCanceledException("The operation was cancelled.", ex, cancellationToken);
}
if (previousState == WebSocketState.Connecting)
{
ForceReadCloseStatusLocked();
throw new WebSocketException(WebSocketError.Faulted, SR.net_webstatus_ConnectFailure, ex);
}
throw new WebSocketException(WebSocketError.NativeError, ex);
Expand Down
8 changes: 4 additions & 4 deletions src/mono/browser/runtime/loader/exit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,18 +274,18 @@ function logOnExit(exit_code: number, reason: any) {
}
}
function unhandledrejection_handler(event: any) {
fatal_handler(event, event.reason);
fatal_handler(event, event.reason, "rejection");
}

function error_handler(event: any) {
fatal_handler(event, event.error);
fatal_handler(event, event.error, "error");
}

function fatal_handler(event: any, reason: any) {
function fatal_handler(event: any, reason: any, type: string) {
event.preventDefault();
try {
if (!reason) {
reason = new Error("Unhandled");
reason = new Error("Unhandled " + type);
}
if (reason.stack === undefined) {
reason.stack = new Error().stack;
Expand Down
173 changes: 107 additions & 66 deletions src/mono/browser/runtime/web-socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const wasm_ws_pending_receive_event_queue = Symbol.for("wasm ws_pending_receive_
const wasm_ws_pending_receive_promise_queue = Symbol.for("wasm ws_pending_receive_promise_queue");
const wasm_ws_pending_open_promise = Symbol.for("wasm ws_pending_open_promise");
const wasm_ws_pending_open_promise_used = Symbol.for("wasm wasm_ws_pending_open_promise_used");
const wasm_ws_pending_error = Symbol.for("wasm wasm_ws_pending_error");
const wasm_ws_pending_close_promises = Symbol.for("wasm ws_pending_close_promises");
const wasm_ws_pending_send_promises = Symbol.for("wasm ws_pending_send_promises");
const wasm_ws_is_aborted = Symbol.for("wasm ws_is_aborted");
Expand Down Expand Up @@ -51,9 +52,9 @@ export function ws_wasm_create(uri: string, sub_protocols: string[] | null, rece
try {
ws = new globalThis.WebSocket(uri, sub_protocols || undefined) as WebSocketExtension;
}
catch (e) {
mono_log_warn("WebSocket error", e);
throw e;
catch (error: any) {
mono_log_warn("WebSocket error in ws_wasm_create: " + error.toString());
throw error;
}
const { promise_control: open_promise_control } = createPromiseController<WebSocketExtension>();

Expand All @@ -65,49 +66,69 @@ export function ws_wasm_create(uri: string, sub_protocols: string[] | null, rece
ws[wasm_ws_receive_status_ptr] = receive_status_ptr;
ws.binaryType = "arraybuffer";
const local_on_open = () => {
if (ws[wasm_ws_is_aborted]) return;
if (!loaderHelpers.is_runtime_running()) return;
open_promise_control.resolve(ws);
prevent_timer_throttling();
try {
if (ws[wasm_ws_is_aborted]) return;
if (!loaderHelpers.is_runtime_running()) return;
open_promise_control.resolve(ws);
prevent_timer_throttling();
} catch (error: any) {
mono_log_warn("failed to propagate WebSocket open event: " + error.toString());
}
};
const local_on_message = (ev: MessageEvent) => {
if (ws[wasm_ws_is_aborted]) return;
if (!loaderHelpers.is_runtime_running()) return;
_mono_wasm_web_socket_on_message(ws, ev);
prevent_timer_throttling();
try {
if (ws[wasm_ws_is_aborted]) return;
if (!loaderHelpers.is_runtime_running()) return;
web_socket_on_message(ws, ev);
prevent_timer_throttling();
} catch (error: any) {
mono_log_warn("failed to propagate WebSocket message event: " + error.toString());
}
};
const local_on_close = (ev: CloseEvent) => {
ws.removeEventListener("message", local_on_message);
if (ws[wasm_ws_is_aborted]) return;
if (!loaderHelpers.is_runtime_running()) return;
try {
ws.removeEventListener("message", local_on_message);
if (ws[wasm_ws_is_aborted]) return;
if (!loaderHelpers.is_runtime_running()) return;

ws[wasm_ws_close_received] = true;
ws["close_status"] = ev.code;
ws["close_status_description"] = ev.reason;
ws[wasm_ws_close_received] = true;
ws["close_status"] = ev.code;
ws["close_status_description"] = ev.reason;

// this reject would not do anything if there was already "open" before it.
open_promise_control.reject(new Error(ev.reason));
if (ws[wasm_ws_pending_open_promise_used]) {
open_promise_control.reject(new Error(ev.reason));
}

for (const close_promise_control of ws[wasm_ws_pending_close_promises]) {
close_promise_control.resolve();
}
for (const close_promise_control of ws[wasm_ws_pending_close_promises]) {
close_promise_control.resolve();
}

// send close to any pending receivers, to wake them
const receive_promise_queue = ws[wasm_ws_pending_receive_promise_queue];
receive_promise_queue.drain((receive_promise_control) => {
setI32(receive_status_ptr, 0); // count
setI32(<any>receive_status_ptr + 4, 2); // type:close
setI32(<any>receive_status_ptr + 8, 1);// end_of_message: true
receive_promise_control.resolve();
});
// send close to any pending receivers, to wake them
const receive_promise_queue = ws[wasm_ws_pending_receive_promise_queue];
receive_promise_queue.drain((receive_promise_control) => {
setI32(receive_status_ptr, 0); // count
setI32(<any>receive_status_ptr + 4, 2); // type:close
setI32(<any>receive_status_ptr + 8, 1);// end_of_message: true
receive_promise_control.resolve();
});
} catch (error: any) {
mono_log_warn("failed to propagate WebSocket close event: " + error.toString());
}
};
const local_on_error = (ev: any) => {
if (ws[wasm_ws_is_aborted]) return;
if (!loaderHelpers.is_runtime_running()) return;
ws.removeEventListener("message", local_on_message);
const error = new Error(ev.message || "WebSocket error");
mono_log_warn("WebSocket error", error);
reject_promises(ws, error);
try {
if (ws[wasm_ws_is_aborted]) return;
if (!loaderHelpers.is_runtime_running()) return;
ws.removeEventListener("message", local_on_message);
const message = ev.message
? "WebSocket error: " + ev.message
: "WebSocket error";
mono_log_warn(message);
ws[wasm_ws_pending_error] = message;
reject_promises(ws, new Error(message));
} catch (error: any) {
mono_log_warn("failed to propagate WebSocket error event: " + error.toString());
}
};
ws.addEventListener("message", local_on_message);
ws.addEventListener("open", local_on_open, { once: true });
Expand All @@ -126,6 +147,9 @@ export function ws_wasm_create(uri: string, sub_protocols: string[] | null, rece

export function ws_wasm_open(ws: WebSocketExtension): Promise<WebSocketExtension> | null {
mono_assert(!!ws, "ERR17: expected ws instance");
if (ws[wasm_ws_pending_error]) {
return rejectedPromise(ws[wasm_ws_pending_error]);
}
const open_promise_control = ws[wasm_ws_pending_open_promise];
ws[wasm_ws_pending_open_promise_used] = true;
return open_promise_control.promise;
Expand All @@ -134,6 +158,9 @@ export function ws_wasm_open(ws: WebSocketExtension): Promise<WebSocketExtension
export function ws_wasm_send(ws: WebSocketExtension, buffer_ptr: VoidPtr, buffer_length: number, message_type: number, end_of_message: boolean): Promise<void> | null {
mono_assert(!!ws, "ERR17: expected ws instance");

if (ws[wasm_ws_pending_error]) {
return rejectedPromise(ws[wasm_ws_pending_error]);
}
if (ws[wasm_ws_is_aborted] || ws[wasm_ws_close_sent]) {
return rejectedPromise("InvalidState: The WebSocket is not connected.");
}
Expand All @@ -144,18 +171,22 @@ export function ws_wasm_send(ws: WebSocketExtension, buffer_ptr: VoidPtr, buffer
}

const buffer_view = new Uint8Array(localHeapViewU8().buffer, <any>buffer_ptr, buffer_length);
const whole_buffer = _mono_wasm_web_socket_send_buffering(ws, buffer_view, message_type, end_of_message);
const whole_buffer = web_socket_send_buffering(ws, buffer_view, message_type, end_of_message);

if (!end_of_message || !whole_buffer) {
return resolvedPromise();
}

return _mono_wasm_web_socket_send_and_wait(ws, whole_buffer);
return web_socket_send_and_wait(ws, whole_buffer);
}

export function ws_wasm_receive(ws: WebSocketExtension, buffer_ptr: VoidPtr, buffer_length: number): Promise<void> | null {
mono_assert(!!ws, "ERR18: expected ws instance");

if (ws[wasm_ws_pending_error]) {
return rejectedPromise(ws[wasm_ws_pending_error]);
}

// we can't quickly return if wasm_ws_close_received==true, because there could be pending messages
if (ws[wasm_ws_is_aborted]) {
const receive_status_ptr = ws[wasm_ws_receive_status_ptr];
Expand All @@ -171,7 +202,7 @@ export function ws_wasm_receive(ws: WebSocketExtension, buffer_ptr: VoidPtr, buf
if (receive_event_queue.getLength()) {
mono_assert(receive_promise_queue.getLength() == 0, "ERR20: Invalid WS state");

_mono_wasm_web_socket_receive_buffering(ws, receive_event_queue, buffer_ptr, buffer_length);
web_socket_receive_buffering(ws, receive_event_queue, buffer_ptr, buffer_length);

return resolvedPromise();
}
Expand Down Expand Up @@ -200,6 +231,9 @@ export function ws_wasm_close(ws: WebSocketExtension, code: number, reason: stri
if (ws[wasm_ws_is_aborted] || ws[wasm_ws_close_sent] || ws.readyState == WebSocket.CLOSED) {
return resolvedPromise();
}
if (ws[wasm_ws_pending_error]) {
return rejectedPromise(ws[wasm_ws_pending_error]);
}
ws[wasm_ws_close_sent] = true;
if (wait_for_close_received) {
const { promise, promise_control } = createPromiseController<void>();
Expand Down Expand Up @@ -235,8 +269,8 @@ export function ws_wasm_abort(ws: WebSocketExtension): void {
try {
// this is different from Managed implementation
ws.close(1000, "Connection was aborted.");
} catch (error) {
mono_log_warn("WebSocket error while aborting", error);
} catch (error: any) {
mono_log_warn("WebSocket error in ws_wasm_abort: " + error.toString());
}
}

Expand All @@ -263,7 +297,7 @@ function reject_promises(ws: WebSocketExtension, error: Error) {
}

// send and return promise
function _mono_wasm_web_socket_send_and_wait(ws: WebSocketExtension, buffer_view: Uint8Array | string): Promise<void> | null {
function web_socket_send_and_wait(ws: WebSocketExtension, buffer_view: Uint8Array | string): Promise<void> | null {
ws.send(buffer_view);
ws[wasm_ws_pending_send_buffer] = null;

Expand All @@ -281,28 +315,34 @@ function _mono_wasm_web_socket_send_and_wait(ws: WebSocketExtension, buffer_view

let nextDelay = 1;
const polling_check = () => {
// was it all sent yet ?
if (ws.bufferedAmount === 0) {
promise_control.resolve();
}
else {
const readyState = ws.readyState;
if (readyState != WebSocket.OPEN && readyState != WebSocket.CLOSING) {
// only reject if the data were not sent
// bufferedAmount does not reset to zero once the connection closes
promise_control.reject(new Error(`InvalidState: ${readyState} The WebSocket is not connected.`));
try {
// was it all sent yet ?
if (ws.bufferedAmount === 0) {
promise_control.resolve();
}
else {
const readyState = ws.readyState;
if (readyState != WebSocket.OPEN && readyState != WebSocket.CLOSING) {
// only reject if the data were not sent
// bufferedAmount does not reset to zero once the connection closes
promise_control.reject(new Error(`InvalidState: ${readyState} The WebSocket is not connected.`));
}
else if (!promise_control.isDone) {
globalThis.setTimeout(polling_check, nextDelay);
// exponentially longer delays, up to 1000ms
nextDelay = Math.min(nextDelay * 1.5, 1000);
return;
}
}
else if (!promise_control.isDone) {
globalThis.setTimeout(polling_check, nextDelay);
// exponentially longer delays, up to 1000ms
nextDelay = Math.min(nextDelay * 1.5, 1000);
return;
// remove from pending
const index = pending.indexOf(promise_control);
if (index > -1) {
pending.splice(index, 1);
}
}
// remove from pending
const index = pending.indexOf(promise_control);
if (index > -1) {
pending.splice(index, 1);
catch (error: any) {
mono_log_warn("WebSocket error in web_socket_send_and_wait: " + error.toString());
promise_control.reject(error);
}
};

Expand All @@ -311,7 +351,7 @@ function _mono_wasm_web_socket_send_and_wait(ws: WebSocketExtension, buffer_view
return promise;
}

function _mono_wasm_web_socket_on_message(ws: WebSocketExtension, event: MessageEvent) {
function web_socket_on_message(ws: WebSocketExtension, event: MessageEvent) {
const event_queue = ws[wasm_ws_pending_receive_event_queue];
const promise_queue = ws[wasm_ws_pending_receive_promise_queue];

Expand Down Expand Up @@ -340,14 +380,14 @@ function _mono_wasm_web_socket_on_message(ws: WebSocketExtension, event: Message
}
while (promise_queue.getLength() && event_queue.getLength()) {
const promise_control = promise_queue.dequeue()!;
_mono_wasm_web_socket_receive_buffering(ws, event_queue,
web_socket_receive_buffering(ws, event_queue,
promise_control.buffer_ptr, promise_control.buffer_length);
promise_control.resolve();
}
prevent_timer_throttling();
}

function _mono_wasm_web_socket_receive_buffering(ws: WebSocketExtension, event_queue: Queue<any>, buffer_ptr: VoidPtr, buffer_length: number) {
function web_socket_receive_buffering(ws: WebSocketExtension, event_queue: Queue<any>, buffer_ptr: VoidPtr, buffer_length: number) {
const event = event_queue.peek();

const count = Math.min(buffer_length, event.data.length - event.offset);
Expand All @@ -367,7 +407,7 @@ function _mono_wasm_web_socket_receive_buffering(ws: WebSocketExtension, event_q
setI32(<any>response_ptr + 8, end_of_message);
}

function _mono_wasm_web_socket_send_buffering(ws: WebSocketExtension, buffer_view: Uint8Array, message_type: number, end_of_message: boolean): Uint8Array | string | null {
function web_socket_send_buffering(ws: WebSocketExtension, buffer_view: Uint8Array, message_type: number, end_of_message: boolean): Uint8Array | string | null {
let buffer = ws[wasm_ws_pending_send_buffer];
let offset = 0;
const length = buffer_view.byteLength;
Expand Down Expand Up @@ -438,6 +478,7 @@ type WebSocketExtension = WebSocket & {
[wasm_ws_pending_open_promise_used]: boolean
[wasm_ws_pending_send_promises]: PromiseController<void>[]
[wasm_ws_pending_close_promises]: PromiseController<void>[]
[wasm_ws_pending_error]: string | undefined
[wasm_ws_is_aborted]: boolean
[wasm_ws_close_received]: boolean
[wasm_ws_close_sent]: boolean
Expand Down Expand Up @@ -476,7 +517,7 @@ function resolvedPromise(): Promise<void> | null {
}
}

function rejectedPromise(message: string): Promise<void> | null {
function rejectedPromise(message: string): Promise<any> | null {
const resolved = Promise.reject(new Error(message));
return wrap_as_cancelable<void>(resolved);
}