使用可寫流
作為一名 JavaScript 開發者,以程式設計方式向流寫入資料非常有用!本文將介紹 Streams API 的可寫流功能。
注意: 本文假定您已瞭解可寫流的用例,並且熟悉高階概念。如果不是,我們建議您先閱讀 Streams 概念和用法概述 以及專門的 Streams API 概念 文章,然後再返回。
介紹一個示例
在我們的 dom-examples/streams 倉庫中,您可以找到一個 簡單的寫入器示例(也可以線上檢視)。該示例將給定的訊息寫入一個可寫流,在寫入流的每個塊時在 UI 上顯示該塊,並在寫入完成後在 UI 上顯示整個訊息。
可寫流的工作原理
讓我們看看我們演示中的可寫流功能是如何工作的。
構造一個可寫流
要建立可寫流,我們使用 WritableStream() 建構函式;語法起初看起來很複雜,但實際上並不太難。
語法骨架如下所示
const stream = new WritableStream(
{
start(controller) {},
write(chunk, controller) {},
close(controller) {},
abort(reason) {},
},
{
highWaterMark: 3,
size: () => 1,
},
);
該建構函式接受兩個物件作為引數。第一個物件是必需的,它建立了一個 JavaScript 模型,表示正在寫入資料的底層接收器。第二個物件是可選的,它允許您為流指定一個自定義佇列策略,其形式為 ByteLengthQueuingStrategy 或 CountQueuingStrategy 的例項。
第一個物件最多可以包含四個成員,所有成員都是可選的
start(controller)— 在WritableStream構造完成後立即呼叫一次的方法。在此方法中,您應該包含設定流功能的程式碼,例如,訪問底層接收器。write(chunk,controller)— 每次有新塊準備好寫入底層接收器(在chunk引數中指定)時都會重複呼叫的方法。close(controller)— 如果應用程式發出訊號表示已完成向流寫入塊,則會呼叫此方法。它應該做任何必要的事情來最終確定對底層接收器的寫入,並釋放對其的訪問許可權。abort(reason)— 如果應用程式發出訊號表示希望突然關閉流並將其置於錯誤狀態,則會呼叫此方法。
我們示例中的建構函式呼叫如下所示
const decoder = new TextDecoder("utf-8");
const queuingStrategy = new CountQueuingStrategy({ highWaterMark: 1 });
let result = "";
const writableStream = new WritableStream(
{
// Implement the sink
write(chunk) {
return new Promise((resolve, reject) => {
const buffer = new ArrayBuffer(1);
const view = new Uint8Array(buffer);
view[0] = chunk;
const decoded = decoder.decode(view, { stream: true });
const listItem = document.createElement("li");
listItem.textContent = `Chunk decoded: ${decoded}`;
list.appendChild(listItem);
result += decoded;
resolve();
});
},
close() {
const listItem = document.createElement("li");
listItem.textContent = `[MESSAGE RECEIVED] ${result}`;
list.appendChild(listItem);
},
abort(err) {
console.error("Sink error:", err);
},
},
queuingStrategy,
);
write()方法包含一個 Promise,其中包含將每個寫入的塊解碼為可在 UI 中寫入的格式的程式碼。這會在每個塊實際寫入時呼叫(請參閱下一節)。- 寫入完成後,
close()方法會自動呼叫——它會將整個解碼結果作為單個字串列印到 UI 中。 - 如果流被中止,
abort()方法會將錯誤列印到控制檯。
寫入
要實際將內容寫入流,我們呼叫 sendMessage() 函式,並將要寫入的訊息和要寫入的流傳遞給它
sendMessage("Hello, world.", writableStream);
sendMessage() 的定義如下
function sendMessage(message, writableStream) {
// defaultWriter is of type WritableStreamDefaultWriter
const defaultWriter = writableStream.getWriter();
const encoder = new TextEncoder();
const encoded = encoder.encode(message);
encoded.forEach((chunk) => {
defaultWriter.ready
.then(() => defaultWriter.write(chunk))
.then(() => console.log("Chunk written to sink."))
.catch((err) => console.error("Chunk error:", err));
});
// Call ready again to ensure that all chunks are written
// before closing the writer.
defaultWriter.ready
.then(() => defaultWriter.close())
.then(() => console.log("All chunks written"))
.catch((err) => console.error("Stream error:", err));
}
因此,在這裡我們使用 WritableStream.getWriter() 建立一個寫入器來將塊寫入流。這會建立一個 WritableStreamDefaultWriter 例項。
我們還使用相關的建構函式建立一個新的 TextEncoder 例項,將訊息編碼為要放入流的塊。
在塊編碼完成後,我們然後對結果陣列呼叫 forEach()。在此塊內,我們使用 WritableStreamDefaultWriter.ready 來檢查寫入器是否已準備好接受另一個塊。ready 返回一個 Promise,當滿足此條件時該 Promise 會 fulfilled,在該 Promise 內部,我們呼叫 WritableStreamDefaultWriter.write() 來實際將塊寫入流。正如上面討論的,這也觸發了在 WritableStream() 建構函式中指定的 write() 方法。
在所有塊都寫入完成後,我們再次執行 ready 檢查,以確保最後一個塊已完成寫入並且所有工作都已完成。當此 ready 檢查 fulfilled 時,我們呼叫 WritableStreamDefaultWriter.close() 來關閉流。正如上面討論的,這也觸發了在 WritableStream() 建構函式中指定的 close() 方法。
控制器
正如您在研究 WritableStream() 語法骨架時注意到的那樣,start()、write() 和 close() 方法可以選擇傳遞一個 controller 引數。它包含 WritableStreamDefaultController 介面的一個例項,開發者可以使用它來根據需要進一步控制流。
它目前只有一個可用的方法——WritableStreamDefaultController.error(),呼叫它會導致對流的未來互動出錯。當應用程式的其他部分出現問題時,這很有用,並且您希望將錯誤傳播到流中,以便整個系統能夠乾淨地失敗,而不是冒著將垃圾資料靜默寫入流(或類似糟糕的情況)。
關閉和中止
如上所述,當寫入完成時,我們呼叫 close() 方法,這會觸發在 WritableStream() 建構函式中指定的 close() 方法。
我們也可以透過呼叫 WritableStreamDefaultWriter.abort() 來中止流。
區別在於,當呼叫 close 時,任何先前入隊的塊都會在流關閉之前寫入並完成。
當呼叫 abort 時,任何先前入隊的塊都會立即被丟棄,然後流會進入錯誤狀態。這也會觸發在 WritableStream() 建構函式中指定的任何 abort() 方法被呼叫。