使用可寫流

作為一名 JavaScript 開發者,以程式設計方式向流寫入資料非常有用!本文將介紹 Streams API 的可寫流功能。

注意: 本文假定您已瞭解可寫流的用例,並且熟悉高階概念。如果不是,我們建議您先閱讀 Streams 概念和用法概述 以及專門的 Streams API 概念 文章,然後再返回。

注意: 如果您正在尋找有關可讀流的資訊,請嘗試改看 使用可讀流使用可讀位元組流

介紹一個示例

在我們的 dom-examples/streams 倉庫中,您可以找到一個 簡單的寫入器示例也可以線上檢視)。該示例將給定的訊息寫入一個可寫流,在寫入流的每個塊時在 UI 上顯示該塊,並在寫入完成後在 UI 上顯示整個訊息。

可寫流的工作原理

讓我們看看我們演示中的可寫流功能是如何工作的。

構造一個可寫流

要建立可寫流,我們使用 WritableStream() 建構函式;語法起初看起來很複雜,但實際上並不太難。

語法骨架如下所示

js
const stream = new WritableStream(
  {
    start(controller) {},
    write(chunk, controller) {},
    close(controller) {},
    abort(reason) {},
  },
  {
    highWaterMark: 3,
    size: () => 1,
  },
);

該建構函式接受兩個物件作為引數。第一個物件是必需的,它建立了一個 JavaScript 模型,表示正在寫入資料的底層接收器。第二個物件是可選的,它允許您為流指定一個自定義佇列策略,其形式為 ByteLengthQueuingStrategyCountQueuingStrategy 的例項。

第一個物件最多可以包含四個成員,所有成員都是可選的

  1. start(controller) — 在 WritableStream 構造完成後立即呼叫一次的方法。在此方法中,您應該包含設定流功能的程式碼,例如,訪問底層接收器。
  2. write(chunk,controller) — 每次有新塊準備好寫入底層接收器(在 chunk 引數中指定)時都會重複呼叫的方法。
  3. close(controller) — 如果應用程式發出訊號表示已完成向流寫入塊,則會呼叫此方法。它應該做任何必要的事情來最終確定對底層接收器的寫入,並釋放對其的訪問許可權。
  4. abort(reason) — 如果應用程式發出訊號表示希望突然關閉流並將其置於錯誤狀態,則會呼叫此方法。

我們示例中的建構函式呼叫如下所示

js
const decoder = new TextDecoder("utf-8");
const queuingStrategy = new CountQueuingStrategy({ highWaterMark: 1 });
let result = "";
const writableStream = new WritableStream(
  {
    // Implement the sink
    write(chunk) {
      return new Promise((resolve, reject) => {
        const buffer = new ArrayBuffer(1);
        const view = new Uint8Array(buffer);
        view[0] = chunk;
        const decoded = decoder.decode(view, { stream: true });
        const listItem = document.createElement("li");
        listItem.textContent = `Chunk decoded: ${decoded}`;
        list.appendChild(listItem);
        result += decoded;
        resolve();
      });
    },
    close() {
      const listItem = document.createElement("li");
      listItem.textContent = `[MESSAGE RECEIVED] ${result}`;
      list.appendChild(listItem);
    },
    abort(err) {
      console.error("Sink error:", err);
    },
  },
  queuingStrategy,
);
  • write() 方法包含一個 Promise,其中包含將每個寫入的塊解碼為可在 UI 中寫入的格式的程式碼。這會在每個塊實際寫入時呼叫(請參閱下一節)。
  • 寫入完成後,close() 方法會自動呼叫——它會將整個解碼結果作為單個字串列印到 UI 中。
  • 如果流被中止,abort() 方法會將錯誤列印到控制檯。

寫入

要實際將內容寫入流,我們呼叫 sendMessage() 函式,並將要寫入的訊息和要寫入的流傳遞給它

js
sendMessage("Hello, world.", writableStream);

sendMessage() 的定義如下

js
function sendMessage(message, writableStream) {
  // defaultWriter is of type WritableStreamDefaultWriter
  const defaultWriter = writableStream.getWriter();
  const encoder = new TextEncoder();
  const encoded = encoder.encode(message);
  encoded.forEach((chunk) => {
    defaultWriter.ready
      .then(() => defaultWriter.write(chunk))
      .then(() => console.log("Chunk written to sink."))
      .catch((err) => console.error("Chunk error:", err));
  });
  // Call ready again to ensure that all chunks are written
  //   before closing the writer.
  defaultWriter.ready
    .then(() => defaultWriter.close())
    .then(() => console.log("All chunks written"))
    .catch((err) => console.error("Stream error:", err));
}

因此,在這裡我們使用 WritableStream.getWriter() 建立一個寫入器來將塊寫入流。這會建立一個 WritableStreamDefaultWriter 例項。

我們還使用相關的建構函式建立一個新的 TextEncoder 例項,將訊息編碼為要放入流的塊。

在塊編碼完成後,我們然後對結果陣列呼叫 forEach()。在此塊內,我們使用 WritableStreamDefaultWriter.ready 來檢查寫入器是否已準備好接受另一個塊。ready 返回一個 Promise,當滿足此條件時該 Promise 會 fulfilled,在該 Promise 內部,我們呼叫 WritableStreamDefaultWriter.write() 來實際將塊寫入流。正如上面討論的,這也觸發了在 WritableStream() 建構函式中指定的 write() 方法。

在所有塊都寫入完成後,我們再次執行 ready 檢查,以確保最後一個塊已完成寫入並且所有工作都已完成。當此 ready 檢查 fulfilled 時,我們呼叫 WritableStreamDefaultWriter.close() 來關閉流。正如上面討論的,這也觸發了在 WritableStream() 建構函式中指定的 close() 方法。

控制器

正如您在研究 WritableStream() 語法骨架時注意到的那樣,start()write()close() 方法可以選擇傳遞一個 controller 引數。它包含 WritableStreamDefaultController 介面的一個例項,開發者可以使用它來根據需要進一步控制流。

它目前只有一個可用的方法——WritableStreamDefaultController.error(),呼叫它會導致對流的未來互動出錯。當應用程式的其他部分出現問題時,這很有用,並且您希望將錯誤傳播到流中,以便整個系統能夠乾淨地失敗,而不是冒著將垃圾資料靜默寫入流(或類似糟糕的情況)。

關閉和中止

如上所述,當寫入完成時,我們呼叫 close() 方法,這會觸發在 WritableStream() 建構函式中指定的 close() 方法。

我們也可以透過呼叫 WritableStreamDefaultWriter.abort() 來中止流。

區別在於,當呼叫 close 時,任何先前入隊的塊都會在流關閉之前寫入並完成。

當呼叫 abort 時,任何先前入隊的塊都會立即被丟棄,然後流會進入錯誤狀態。這也會觸發在 WritableStream() 建構函式中指定的任何 abort() 方法被呼叫。