使用 WebSocketStream 編寫客戶端
WebSocketStream API 是 WebSocket 的一種基於 Promise 的替代方案,用於建立和使用客戶端 WebSocket 連線。WebSocketStream 使用 Streams API 來處理訊息的接收和傳送,這意味著套接字連線可以自動利用流的 backpressure(開發者無需額外操作),從而調節讀寫速度,避免應用程式中出現瓶頸。
本文將介紹如何使用 WebSocketStream API 建立 WebSocket 客戶端。
特性檢測
要檢查 WebSocketStream API 是否受支援,您可以使用以下方法:
if ("WebSocketStream" in self) {
// WebSocketStream is supported
}
建立 WebSocketStream 物件
要建立 WebSocket 客戶端,首先需要使用 WebSocketStream() 建構函式建立一個新的 WebSocketStream 例項。最簡單的形式是將其作為引數傳遞 WebSocket 伺服器的 URL。
const wss = new WebSocketStream("wss://example.com/wss");
它還可以接受一個包含自定義協議和/或 AbortSignal 的 options 物件。AbortSignal 可用於在 handshake 完成之前(即在 opened promise 解析之前)中止連線嘗試。它通常用於實現連線超時。例如,以下程式碼將在握手超過 5 秒才能完成時超時:
const controller = new AbortController();
const queueWSS = new WebSocketStream("wss://example.com/queue", {
protocols: ["amqp", "mqtt"],
signal: AbortSignal.timeout(5000),
});
傳送和接收資料
WebSocketStream 例項具有 opened 屬性——它返回一個 promise,一旦 WebSocket 連線成功開啟,該 promise 將以包含 ReadableStream 和 WritableStream 例項的物件進行解析。
const { readable, writable } = await wss.opened;
在這些物件上呼叫 getReader() 和 getWriter() 分別為我們提供了一個 ReadableStreamDefaultReader 和一個 WritableStreamDefaultWriter,它們可用於從套接字連線讀取和寫入資料。
const reader = readable.getReader();
const writer = writable.getWriter();
要將資料寫入套接字,可以使用 WritableStreamDefaultWriter.write()。
writer.write("My message");
要從套接字讀取資料,可以連續呼叫 ReadableStreamDefaultReader.read(),直到流結束,由 done 為 true 表示。
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
// Process value in some way
}
瀏覽器會自動控制客戶端接收和傳送資料的速率,在需要時應用 backpressure。如果資料到達速度快於客戶端 read() 的速度,底層的 Streams API 就會向伺服器施加 backpressure。此外,write() 操作只有在安全進行的情況下才會進行。
關閉連線
要關閉連線,請呼叫 WebSocketStream.close() 方法,可選地傳遞一個 closing code 和原因。
wss.close({
closeCode: 4000,
reason: "Night draws to a close",
});
注意: 根據伺服器的設定和您使用的狀態碼,伺服器可能會選擇忽略自定義程式碼,而採用與關閉原因相符的有效程式碼。
關閉底層的 WritableStream 或 WritableStreamDefaultWriter 也會關閉連線。
要處理連線關閉,請等待 closed promise 解析。
const { closeCode, reason } = await wss.closed;
完整的示例客戶端
為了演示 WebSocketStream 的基本用法,我們建立了一個示例客戶端。您可以在文章底部找到 full listing,並跟隨下面的解釋進行操作。
注意: 要使示例正常工作,您還需要一個伺服器元件。我們將客戶端編寫為與 Writing a WebSocket server in JavaScript (Deno) 中解釋的 Deno 伺服器配合使用,但任何相容的伺服器都可以。
演示的 HTML 如下。它包括資訊性的 <h2> 和 <p> 元素、一個用於關閉 WebSocket 連線(最初停用)的 <button>,以及一個供我們寫入輸出訊息的 <div>。
<h2>WebSocketStream Test</h2>
<p>Sends a ping every five seconds</p>
<button id="close" disabled>Close socket connection</button>
<div id="output"></div>
接下來是 JavaScript。首先,我們獲取輸出 <div> 和關閉 <button> 的引用,並定義一個將訊息寫入 <div> 的實用函式。
const output = document.querySelector("#output");
const closeBtn = document.querySelector("#close");
function writeToScreen(message) {
const pElem = document.createElement("p");
pElem.textContent = message;
output.appendChild(pElem);
}
接下來,我們建立一個 if...else 結構來檢測 WebSocketStream 的支援情況,並在不支援的瀏覽器上輸出一條資訊性訊息。
if (!("WebSocketStream" in self)) {
writeToScreen("Your browser does not support WebSocketStream");
} else {
// supporting code path
}
在支援的程式碼路徑中,我們首先定義一個包含 WebSocket 伺服器 URL 的變數,並構造一個新的 WebSocketServer 例項。
const wsURL = "ws://127.0.0.1/";
const wss = new WebSocketStream(wsURL);
注意: 在生產應用程式中,最好使用安全的 WebSockets (wss://)。但是,在此演示中,我們連線到 localhost,因此需要使用非安全的 WebSocket 協議 (ws://) 才能使示例正常工作。
我們的程式碼的主要部分包含在 start() 函式中,我們定義它然後立即呼叫它。我們 await opened promise,然後在它解析後寫入一條訊息告知讀者連線成功,並從返回的 readable 和 writable 屬性建立 ReadableStreamDefaultReader 和 WritableStreamDefaultWriter 例項。
接下來,我們建立一個 start() 函式,該函式向伺服器傳送“ping”訊息並接收“pong”訊息作為響應,然後呼叫它。在函式體內,我們 await wss.opened promise,並從其解析值建立 reader 和 writer。一旦套接字開啟,我們將其告知使用者並啟用關閉按鈕。接下來,我們將 "ping" 值 write() 到套接字,並告知使用者。此時,伺服器將響應 "pong" 訊息。我們 await 響應的 read(),將其告知使用者,然後在 5 秒的延遲後再次向伺服器寫入 "ping"。這將無限期地繼續 "ping"/"pong" 迴圈。
async function start() {
const { readable, writable } = await wss.opened;
writeToScreen("CONNECTED");
closeBtn.disabled = false;
const reader = readable.getReader();
const writer = writable.getWriter();
writer.write("ping");
writeToScreen("SENT: ping");
while (true) {
const { value, done } = await reader.read();
writeToScreen(`RECEIVED: ${value}`);
if (done) {
break;
}
setTimeout(async () => {
try {
await writer.write("ping");
writeToScreen("SENT: ping");
} catch (e) {
writeToScreen(`Error writing to socket: ${e.message}`);
}
}, 5000);
}
}
start();
注意: setTimeout() 函式將 write() 呼叫包裝在 try...catch 塊中,以處理如果應用程式嘗試在流關閉後寫入流時可能出現的任何錯誤。
現在,我們包含一個 promise 樣式的程式碼段,以在 WebSocket 連線關閉時(由 closed promise 解析訊號)向用戶告知程式碼和原因。
wss.closed.then((result) => {
writeToScreen(
`DISCONNECTED: code ${result.closeCode}, message "${result.reason}"`,
);
console.log("Socket closed", result.closeCode, result.reason);
});
最後,我們向關閉按鈕新增一個事件監聽器,該監聽器使用 close() 方法關閉連線,並附帶一個程式碼和自定義原因。該函式還會停用關閉按鈕——我們不希望使用者在連線已關閉後再次按下它。
closeBtn.addEventListener("click", () => {
wss.close({
closeCode: 1000,
reason: "That's all folks",
});
closeBtn.disabled = true;
});
完整列表
const output = document.querySelector("#output");
const closeBtn = document.querySelector("#close");
function writeToScreen(message) {
const pElem = document.createElement("p");
pElem.textContent = message;
output.appendChild(pElem);
}
if (!("WebSocketStream" in self)) {
writeToScreen("Your browser does not support WebSocketStream");
} else {
const wsURL = "ws://127.0.0.1/";
const wss = new WebSocketStream(wsURL);
console.log(wss.url);
async function start() {
const { readable, writable, extensions, protocol } = await wss.opened;
writeToScreen("CONNECTED");
closeBtn.disabled = false;
const reader = readable.getReader();
const writer = writable.getWriter();
writer.write("ping");
writeToScreen("SENT: ping");
while (true) {
const { value, done } = await reader.read();
writeToScreen(`RECEIVED: ${value}`);
if (done) {
break;
}
setTimeout(() => {
writer.write("ping");
writeToScreen("SENT: ping");
}, 5000);
}
}
start();
wss.closed.then((result) => {
writeToScreen(
`DISCONNECTED: code ${result.closeCode}, message "${result.reason}"`,
);
console.log("Socket closed", result.closeCode, result.reason);
});
closeBtn.addEventListener("click", () => {
wss.close({
closeCode: 1000,
reason: "That's all folks",
});
closeBtn.disabled = true;
});
}