使用可讀流
作為一名 JavaScript 開發者,以塊的形式,透過程式設計方式讀取和操作透過網路接收到的資料流非常有用!但如何使用 Streams API 的可讀流功能呢?本文將解釋其基礎知識。
注意:本文假定你瞭解可讀流的使用場景,並且熟悉其高層概念。如果不瞭解,我們建議你先閱讀流的概念和使用概述以及專門的Streams API 概念文章,然後再回來。
注意:如果你正在尋找有關可寫流的資訊,請嘗試閱讀使用可寫流。
查詢一些示例
本文將探討各種示例,這些示例均取自我們的 dom-examples/streams 倉庫。你可以在其中找到完整的原始碼以及示例連結。
以流的形式消費 fetch 請求
Fetch API 允許你透過網路獲取資源,它提供了 XHR 的現代替代方案。它有許多優點,而它真正棒的地方在於,瀏覽器最近添加了將 fetch 響應作為可讀流消費的功能。
Request.body 和 Response.body 屬性是可用的,它們是暴露正文內容作為可讀流的 getter。
正如我們的簡單流泵示例所示(也可在此處檢視即時示例),暴露它只需要訪問響應的 body 屬性即可。
// Fetch the original image
fetch("./tortoise.png")
// Retrieve its body as ReadableStream
.then((response) => response.body);
這為我們提供了一個 ReadableStream 物件。
附加一個讀取器
現在我們已經有了流式主體,讀取流需要向其附加一個讀取器。這是透過 ReadableStream.getReader() 方法完成的。
// Fetch the original image
fetch("./tortoise.png")
// Retrieve its body as ReadableStream
.then((response) => response.body)
.then((body) => {
const reader = body.getReader();
// …
});
呼叫此方法會建立一個讀取器並將其鎖定到流中——在釋放此讀取器之前,例如透過呼叫 ReadableStreamDefaultReader.releaseLock(),其他任何讀取器都無法讀取此流。
另外請注意,前面的示例可以減少一步,因為 response.body 是同步的,所以不需要 Promise。
// Fetch the original image
fetch("./tortoise.png")
// Retrieve its body as ReadableStream
.then((response) => {
const reader = response.body.getReader();
// …
});
讀取流
現在你已經附加了讀取器,可以使用 ReadableStreamDefaultReader.read() 方法從流中讀取資料塊。這會從流中讀取一個數據塊,你可以對其進行任何操作。例如,我們的簡單流泵示例接著將每個資料塊排入一個新的自定義 ReadableStream 中(我們將在下一節中詳細介紹),然後從中建立一個新的 Response,將其作為 Blob 消費,使用 URL.createObjectURL() 從該 Blob 建立一個物件 URL,然後將其顯示在 <img> 元素中,從而有效地建立了我們最初獲取的影像的副本。
// Fetch the original image
fetch("./tortoise.png")
// Retrieve its body as ReadableStream
.then((response) => {
const reader = response.body.getReader();
return new ReadableStream({
start(controller) {
return pump();
function pump() {
return reader.read().then(({ done, value }) => {
// When no more data needs to be consumed, close the stream
if (done) {
controller.close();
return;
}
// Enqueue the next data chunk into our target stream
controller.enqueue(value);
return pump();
});
}
},
});
})
// Create a new response out of the stream
.then((stream) => new Response(stream))
// Create an object URL for the response
.then((response) => response.blob())
.then((blob) => URL.createObjectURL(blob))
// Update image
.then((url) => console.log((image.src = url)))
.catch((err) => console.error(err));
讓我們詳細看看 read() 是如何使用的。在上面看到的 pump() 函式中,我們首先呼叫 read(),它返回一個包含結果物件的 Promise——這個物件包含我們讀取的結果,形式為 { done, value }。
reader.read().then(({ done, value }) => {
/* … */
});
結果可以是三種不同型別之一
- 如果有一個塊可供讀取,Promise 將以形如
{ value: theChunk, done: false }的物件解析。 - 如果流關閉,Promise 將以形如
{ value: undefined, done: true }的物件解析。 - 如果流出錯,Promise 將以相關的錯誤拒絕。
接下來,我們檢查 done 是否為 true。如果是,則沒有更多塊可讀(值為 undefined),因此我們從函式返回,並使用 ReadableStreamDefaultController.close() 關閉自定義流。
if (done) {
controller.close();
return;
}
注意:close() 是新自定義流的一部分,而不是我們在此討論的原始流。我們將在下一節中詳細解釋自定義流。
如果 done 不為 true,我們處理已讀取的新塊(包含在結果物件的 value 屬性中),然後再次呼叫 pump() 函式以讀取下一個塊。
// Enqueue the next data chunk into our target stream
controller.enqueue(value);
return pump();
這是使用流讀取器時你會看到的標準模式。
- 你編寫一個函式,它從讀取流開始。
- 如果流中沒有更多內容可讀,則從函式返回。
- 如果流中有更多內容可讀,則處理當前塊,然後再次執行該函式。
- 你會持續鏈式呼叫
pump()函式,直到沒有更多流可讀,此時將遵循步驟 2。
刪除所有實際執行“泵送”的程式碼,程式碼可以概括為類似這樣:
fetch("http://example.com/somefile.txt")
// Retrieve its body as ReadableStream
.then((response) => {
const reader = response.body.getReader();
// read() returns a promise that resolves when a value has been received
reader.read().then(function pump({ done, value }) {
if (done) {
// Do something with last chunk of data then exit reader
return;
}
// Otherwise do something here to process current chunk
// Read some more, and call this function again
return reader.read().then(pump);
});
})
.catch((err) => console.error(err));
注意:該函式看起來像是 pump() 呼叫自身並導致潛在的深度遞迴。然而,由於 pump 是非同步的,並且每個 pump() 呼叫都在 promise 處理程式的末尾,所以它實際上類似於一個 promise 處理程式鏈。
當使用 async/await 而不是 Promise 編寫時,讀取流甚至更容易。
async function readData(url) {
const response = await fetch(url);
const reader = response.body.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
// Do something with last chunk of data then exit reader
return;
}
// Otherwise do something here to process current chunk
}
}
使用非同步迭代消費 fetch()
還有一種更簡單的方法來消費 fetch(),那就是使用 for await...of 語法迭代返回的 response.body。這之所以有效,是因為 response.body 返回一個 ReadableStream,它是一個非同步可迭代物件。
使用這種方法,上一節中的示例程式碼可以重寫為如下所示:
async function readData(url) {
const response = await fetch(url);
for await (const chunk of response.body) {
// Do something with each "chunk"
}
// Exit when done
}
如果你想停止迭代流,可以使用 AbortController 及其關聯的 AbortSignal 來取消 fetch() 操作。
const aborter = new AbortController();
button.addEventListener("click", () => aborter.abort());
logChunks("http://example.com/somefile.txt", { signal: aborter.signal });
async function logChunks(url, { signal }) {
const response = await fetch(url, { signal });
for await (const chunk of response.body) {
// Do something with the chunk
}
}
或者,你可以使用 break 退出迴圈,如以下程式碼所示。請注意,迴圈中的程式碼僅在流有新資料需要處理時才會執行,因此從訊號中止到呼叫 break 之間可能會有一些延遲。
const aborter = new AbortController();
button.addEventListener("click", () => aborter.abort());
logChunks("http://example.com/somefile.txt", { signal: aborter.signal });
async function logChunks(url, { signal }) {
const response = await fetch(url);
for await (const chunk of response.body) {
if (signal.aborted) break; // just break out of loop
// Do something with the chunk
}
}
非同步讀取器示例
下面的程式碼展示了一個更完整的示例。在這裡,fetch 流在 try/catch 塊內部使用迭代器進行消費。在迴圈的每次迭代中,程式碼只是簡單地記錄和計數接收到的位元組。如果出現錯誤,它會記錄問題。fetch() 操作可以使用 AbortSignal 取消,這也會被記錄為錯誤。
let bytes = 0;
const aborter = new AbortController();
button.addEventListener("click", () => aborter.abort());
logChunks("http://example.com/somefile.txt", { signal: aborter.signal });
async function logChunks(url, { signal }) {
try {
const response = await fetch(url, signal);
for await (const chunk of response.body) {
if (signal.aborted) throw signal.reason;
bytes += chunk.length;
logConsumer(`Chunk: ${chunk}. Read ${bytes} characters.`);
}
} catch (e) {
if (e instanceof TypeError) {
console.log(e);
logConsumer("TypeError: Browser may not support async iteration");
} else {
logConsumer(`Error in async iterator: ${e}.`);
}
}
}
下面的示例日誌顯示程式碼正在執行,或者報告你的瀏覽器不支援 ReadableStream 的非同步迭代。右側顯示了接收到的資料塊;你可以按下取消按鈕來停止獲取。
注意:為了演示目的,此 fetch 操作是模擬的,它只返回一個生成隨機文字塊的 ReadableStream。左側的“底層源”是模擬源中正在生成的資料,而右側的列是來自消費者的日誌。(模擬源的程式碼未顯示,因為它與示例無關。)
建立你自己的自定義可讀流
本文一直在研究的簡單流泵示例包含第二部分——一旦我們從 fetch 主體中分塊讀取了影像,我們就會將它們排入我們自己建立的另一個自定義流中。我們如何建立它?使用 ReadableStream() 建構函式。
ReadableStream() 建構函式
當瀏覽器為你提供流時(例如 Fetch),從流中讀取很容易,但有時你需要建立一個自定義流並用你自己的資料塊填充它。ReadableStream() 建構函式允許你透過一種初看起來很複雜但實際上並不太糟糕的語法來做到這一點。
通用語法骨架如下所示:
const stream = new ReadableStream(
{
start(controller) {},
pull(controller) {},
cancel() {},
type,
autoAllocateChunkSize,
},
{
highWaterMark: 3,
size: () => 1,
},
);
建構函式接受兩個物件作為引數。第一個物件是必需的,它在 JavaScript 中建立一個表示資料正在從中讀取的底層源的模型。第二個物件是可選的,它允許你為流指定一個自定義佇列策略。你很少需要這樣做,所以我們暫時只關注第一個。
第一個物件最多可以包含五個成員,其中只有第一個是必需的:
start(controller)— 在ReadableStream構造後立即呼叫一次的方法。在此方法內部,你應該包含設定流功能的程式碼,例如,開始生成資料或以其他方式訪問源。pull(controller)— 一個方法,如果包含,將重複呼叫,直到流的內部佇列已滿。這可用於控制流,因為更多資料塊被排隊。cancel()— 如果包含,當應用程式發出訊號表示流要取消時(例如,如果呼叫了ReadableStream.cancel()),將呼叫此方法。其內容應執行必要的操作以釋放對流源的訪問。type和autoAllocateChunkSize— 這些(如果包含)用於表示流將是位元組流。位元組流在 使用可讀位元組流 中單獨介紹,因為它們在目的和用例上與常規(預設)流有所不同。
再次檢視我們的簡單示例程式碼,你可以看到我們的 ReadableStream() 建構函式只包含一個方法——start(),它用於從我們的 fetch 流中讀取所有資料。
// Fetch the original image
fetch("./tortoise.png")
// Retrieve its body as ReadableStream
.then((response) => {
const reader = response.body.getReader();
return new ReadableStream({
start(controller) {
return pump();
function pump() {
return reader.read().then(({ done, value }) => {
// When no more data needs to be consumed, close the stream
if (done) {
controller.close();
return;
}
// Enqueue the next data chunk into our target stream
controller.enqueue(value);
return pump();
});
}
},
});
});
ReadableStream 控制器
你會注意到傳遞給 ReadableStream() 建構函式的 start() 和 pull() 方法被賦予了 controller 引數——它們是 ReadableStreamDefaultController 類的例項,可用於控制你的流。
在我們的示例中,我們使用控制器的 enqueue() 方法,將從 fetch 正文中讀取的值排入自定義流中。
此外,當我們完成讀取 fetch 主體時,我們使用控制器的 close() 方法來關閉自定義流——任何先前排隊的資料塊仍然可以從中讀取,但不能再排入,並且在讀取完成後流將關閉。
從自定義流中讀取
在我們的簡單流泵示例中,我們透過將其傳遞給 Response 建構函式呼叫來消費自定義可讀流,之後我們將其作為 blob() 消費。
readableStream
.then((stream) => new Response(stream))
.then((response) => response.blob())
.then((blob) => URL.createObjectURL(blob))
.then((url) => console.log((image.src = url)))
.catch((err) => console.error(err));
但自定義流仍然是 ReadableStream 例項,這意味著你可以為其附加一個讀取器。例如,請看我們的 簡單隨機流演示(也可在此處檢視即時示例),它建立了一個自定義流,將一些隨機字串排隊,然後在按下“停止字串生成”按鈕後再次從流中讀取資料。
注意:為了使用 FetchEvent.respondWith() 消費流,排隊流內容必須是 Uint8Array 型別;例如,使用 TextEncoder 進行編碼。
自定義流建構函式有一個 start() 方法,它使用 setInterval() 呼叫每秒生成一個隨機字串。ReadableStreamDefaultController.enqueue() 然後用於將其排入流中。當按鈕被按下時,間隔被取消,並呼叫名為 readStream() 的函式以再次從流中讀取資料。我們還會關閉流,因為我們已停止向其排隊資料塊。
let interval;
const stream = new ReadableStream({
start(controller) {
interval = setInterval(() => {
const string = randomChars();
// Add the string to the stream
controller.enqueue(string);
// show it on the screen
const listItem = document.createElement("li");
listItem.textContent = string;
list1.appendChild(listItem);
}, 1000);
button.addEventListener("click", () => {
clearInterval(interval);
readStream();
controller.close();
});
},
pull(controller) {
// We don't really need a pull in this example
},
cancel() {
// This is called if the reader cancels,
// so we should stop generating strings
clearInterval(interval);
},
});
在 readStream() 函式本身中,我們使用 ReadableStream.getReader() 將讀取器鎖定到流中,然後遵循我們之前看到的相同模式——使用 read() 讀取每個資料塊,檢查 done 是否為 true,如果是則結束程序,如果不是則讀取下一個資料塊並處理它,然後再執行 read() 方法。
function readStream() {
const reader = stream.getReader();
let charsReceived = 0;
let result = "";
// read() returns a promise that resolves
// when a value has been received
reader.read().then(function processText({ done, value }) {
// Result objects contain two properties:
// done - true if the stream has already given you all its data.
// value - some data. Always undefined when done is true.
if (done) {
console.log("Stream complete");
para.textContent = result;
return;
}
charsReceived += value.length;
const chunk = value;
const listItem = document.createElement("li");
listItem.textContent = `Read ${charsReceived} characters so far. Current chunk = ${chunk}`;
list2.appendChild(listItem);
result += chunk;
// Read some more, and call this function again
return reader.read().then(processText);
});
}
關閉和取消流
我們已經展示了使用 ReadableStreamDefaultController.close() 關閉讀取器的示例。正如我們之前所說,任何先前排隊的資料塊仍然會被讀取,但不能再排隊,因為它已關閉。
如果你想完全清除流並丟棄任何已排隊的資料塊,你可以使用 ReadableStream.cancel() 或 ReadableStreamDefaultReader.cancel()。
分叉流
有時你可能想同時讀取一個流兩次。這可以透過 ReadableStream.tee() 方法實現——它輸出一個數組,其中包含原始可讀流的兩個相同副本,然後可以由兩個獨立的讀取器獨立讀取。
你可以在 ServiceWorker 中執行此操作,例如,如果你想從伺服器獲取響應並將其流式傳輸到瀏覽器,同時也將其流式傳輸到 Service Worker 快取。由於響應主體不能被消費多次,並且一個流不能同時被多個讀取器讀取,因此你需要兩個副本才能完成此操作。
我們在簡單分叉示例(也可以在此處檢視即時示例)中提供了相關示例。此示例的工作方式與我們的簡單隨機流非常相似,不同之處在於,當按下按鈕停止生成隨機字串時,會獲取自定義流並進行分叉,然後讀取這兩個結果流。
function teeStream() {
const teedOff = stream.tee();
readStream(teedOff[0], list2);
readStream(teedOff[1], list3);
}
管道鏈
流的另一個特性是能夠將流相互管道化(稱為管道鏈)。這涉及兩種方法——ReadableStream.pipeThrough(),它透過寫入器/讀取器對管道一個可讀流,以將一種資料格式轉換為另一種格式;以及ReadableStream.pipeTo(),它將一個可讀流管道到一個充當管道鏈端點的寫入器。
我們確實有一個名為解包 PNG 塊的示例(也可在此處檢視即時示例),該示例將影像作為流獲取,然後將其管道到一個自定義的 PNG 轉換流,該流從二進位制資料流中檢索 PNG 塊。
// Fetch the original image
fetch("png-logo.png")
// Retrieve its body as ReadableStream
.then((response) => response.body)
// Create a gray-scaled PNG stream out of the original
.then((rs) => logReadableStream("Fetch Response Stream", rs))
.then((body) => body.pipeThrough(new PNGTransformStream()))
.then((rs) => logReadableStream("PNG Chunk Stream", rs));
我們還沒有使用 TransformStream 的示例。
總結
這解釋了“預設”可讀流的基礎知識。
請參閱使用可讀位元組流,瞭解如何使用可讀位元組流:這些流具有底層位元組源,可以高效地執行零複製傳輸到消費者,繞過流的內部佇列。