使用 WebSocketStream 編寫客戶端

WebSocketStream API 是 WebSocket 的一種基於 Promise 的替代方案,用於建立和使用客戶端 WebSocket 連線。WebSocketStream 使用 Streams API 來處理訊息的接收和傳送,這意味著套接字連線可以自動利用流的 backpressure(開發者無需額外操作),從而調節讀寫速度,避免應用程式中出現瓶頸。

本文將介紹如何使用 WebSocketStream API 建立 WebSocket 客戶端。

特性檢測

要檢查 WebSocketStream API 是否受支援,您可以使用以下方法:

js
if ("WebSocketStream" in self) {
  // WebSocketStream is supported
}

建立 WebSocketStream 物件

要建立 WebSocket 客戶端,首先需要使用 WebSocketStream() 建構函式建立一個新的 WebSocketStream 例項。最簡單的形式是將其作為引數傳遞 WebSocket 伺服器的 URL。

js
const wss = new WebSocketStream("wss://example.com/wss");

它還可以接受一個包含自定義協議和/或 AbortSignaloptions 物件。AbortSignal 可用於在 handshake 完成之前(即在 opened promise 解析之前)中止連線嘗試。它通常用於實現連線超時。例如,以下程式碼將在握手超過 5 秒才能完成時超時:

js
const controller = new AbortController();
const queueWSS = new WebSocketStream("wss://example.com/queue", {
  protocols: ["amqp", "mqtt"],
  signal: AbortSignal.timeout(5000),
});

傳送和接收資料

WebSocketStream 例項具有 opened 屬性——它返回一個 promise,一旦 WebSocket 連線成功開啟,該 promise 將以包含 ReadableStreamWritableStream 例項的物件進行解析。

js
const { readable, writable } = await wss.opened;

在這些物件上呼叫 getReader()getWriter() 分別為我們提供了一個 ReadableStreamDefaultReader 和一個 WritableStreamDefaultWriter,它們可用於從套接字連線讀取和寫入資料。

js
const reader = readable.getReader();
const writer = writable.getWriter();

要將資料寫入套接字,可以使用 WritableStreamDefaultWriter.write()

js
writer.write("My message");

要從套接字讀取資料,可以連續呼叫 ReadableStreamDefaultReader.read(),直到流結束,由 donetrue 表示。

js
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }

  // Process value in some way
}

瀏覽器會自動控制客戶端接收和傳送資料的速率,在需要時應用 backpressure。如果資料到達速度快於客戶端 read() 的速度,底層的 Streams API 就會向伺服器施加 backpressure。此外,write() 操作只有在安全進行的情況下才會進行。

關閉連線

要關閉連線,請呼叫 WebSocketStream.close() 方法,可選地傳遞一個 closing code 和原因。

js
wss.close({
  closeCode: 4000,
  reason: "Night draws to a close",
});

注意: 根據伺服器的設定和您使用的狀態碼,伺服器可能會選擇忽略自定義程式碼,而採用與關閉原因相符的有效程式碼。

關閉底層的 WritableStreamWritableStreamDefaultWriter 也會關閉連線。

要處理連線關閉,請等待 closed promise 解析。

js
const { closeCode, reason } = await wss.closed;

完整的示例客戶端

為了演示 WebSocketStream 的基本用法,我們建立了一個示例客戶端。您可以在文章底部找到 full listing,並跟隨下面的解釋進行操作。

注意: 要使示例正常工作,您還需要一個伺服器元件。我們將客戶端編寫為與 Writing a WebSocket server in JavaScript (Deno) 中解釋的 Deno 伺服器配合使用,但任何相容的伺服器都可以。

演示的 HTML 如下。它包括資訊性的 <h2><p> 元素、一個用於關閉 WebSocket 連線(最初停用)的 <button>,以及一個供我們寫入輸出訊息的 <div>

html
<h2>WebSocketStream Test</h2>
<p>Sends a ping every five seconds</p>
<button id="close" disabled>Close socket connection</button>
<div id="output"></div>

接下來是 JavaScript。首先,我們獲取輸出 <div> 和關閉 <button> 的引用,並定義一個將訊息寫入 <div> 的實用函式。

js
const output = document.querySelector("#output");
const closeBtn = document.querySelector("#close");

function writeToScreen(message) {
  const pElem = document.createElement("p");
  pElem.textContent = message;
  output.appendChild(pElem);
}

接下來,我們建立一個 if...else 結構來檢測 WebSocketStream 的支援情況,並在不支援的瀏覽器上輸出一條資訊性訊息。

js
if (!("WebSocketStream" in self)) {
  writeToScreen("Your browser does not support WebSocketStream");
} else {
  // supporting code path
}

在支援的程式碼路徑中,我們首先定義一個包含 WebSocket 伺服器 URL 的變數,並構造一個新的 WebSocketServer 例項。

js
const wsURL = "ws://127.0.0.1/";
const wss = new WebSocketStream(wsURL);

注意: 在生產應用程式中,最好使用安全的 WebSockets (wss://)。但是,在此演示中,我們連線到 localhost,因此需要使用非安全的 WebSocket 協議 (ws://) 才能使示例正常工作。

我們的程式碼的主要部分包含在 start() 函式中,我們定義它然後立即呼叫它。我們 await opened promise,然後在它解析後寫入一條訊息告知讀者連線成功,並從返回的 readablewritable 屬性建立 ReadableStreamDefaultReaderWritableStreamDefaultWriter 例項。

接下來,我們建立一個 start() 函式,該函式向伺服器傳送“ping”訊息並接收“pong”訊息作為響應,然後呼叫它。在函式體內,我們 await wss.opened promise,並從其解析值建立 reader 和 writer。一旦套接字開啟,我們將其告知使用者並啟用關閉按鈕。接下來,我們將 "ping"write() 到套接字,並告知使用者。此時,伺服器將響應 "pong" 訊息。我們 await 響應的 read(),將其告知使用者,然後在 5 秒的延遲後再次向伺服器寫入 "ping"。這將無限期地繼續 "ping"/"pong" 迴圈。

js
async function start() {
  const { readable, writable } = await wss.opened;
  writeToScreen("CONNECTED");
  closeBtn.disabled = false;
  const reader = readable.getReader();
  const writer = writable.getWriter();

  writer.write("ping");
  writeToScreen("SENT: ping");

  while (true) {
    const { value, done } = await reader.read();
    writeToScreen(`RECEIVED: ${value}`);
    if (done) {
      break;
    }

    setTimeout(async () => {
      try {
        await writer.write("ping");
        writeToScreen("SENT: ping");
      } catch (e) {
        writeToScreen(`Error writing to socket: ${e.message}`);
      }
    }, 5000);
  }
}

start();

注意: setTimeout() 函式將 write() 呼叫包裝在 try...catch 塊中,以處理如果應用程式嘗試在流關閉後寫入流時可能出現的任何錯誤。

現在,我們包含一個 promise 樣式的程式碼段,以在 WebSocket 連線關閉時(由 closed promise 解析訊號)向用戶告知程式碼和原因。

js
wss.closed.then((result) => {
  writeToScreen(
    `DISCONNECTED: code ${result.closeCode}, message "${result.reason}"`,
  );
  console.log("Socket closed", result.closeCode, result.reason);
});

最後,我們向關閉按鈕新增一個事件監聽器,該監聽器使用 close() 方法關閉連線,並附帶一個程式碼和自定義原因。該函式還會停用關閉按鈕——我們不希望使用者在連線已關閉後再次按下它。

js
closeBtn.addEventListener("click", () => {
  wss.close({
    closeCode: 1000,
    reason: "That's all folks",
  });

  closeBtn.disabled = true;
});

完整列表

js
const output = document.querySelector("#output");
const closeBtn = document.querySelector("#close");

function writeToScreen(message) {
  const pElem = document.createElement("p");
  pElem.textContent = message;
  output.appendChild(pElem);
}

if (!("WebSocketStream" in self)) {
  writeToScreen("Your browser does not support WebSocketStream");
} else {
  const wsURL = "ws://127.0.0.1/";
  const wss = new WebSocketStream(wsURL);

  console.log(wss.url);

  async function start() {
    const { readable, writable, extensions, protocol } = await wss.opened;
    writeToScreen("CONNECTED");
    closeBtn.disabled = false;
    const reader = readable.getReader();
    const writer = writable.getWriter();

    writer.write("ping");
    writeToScreen("SENT: ping");

    while (true) {
      const { value, done } = await reader.read();
      writeToScreen(`RECEIVED: ${value}`);
      if (done) {
        break;
      }

      setTimeout(() => {
        writer.write("ping");
        writeToScreen("SENT: ping");
      }, 5000);
    }
  }

  start();

  wss.closed.then((result) => {
    writeToScreen(
      `DISCONNECTED: code ${result.closeCode}, message "${result.reason}"`,
    );
    console.log("Socket closed", result.closeCode, result.reason);
  });

  closeBtn.addEventListener("click", () => {
    wss.close({
      closeCode: 1000,
      reason: "That's all folks",
    });

    closeBtn.disabled = true;
  });
}