使用可讀位元組流

可讀位元組流可讀流,其底層位元組源的type: "bytes",並支援從底層源到消費者的高效零複製資料傳輸(繞過流的內部佇列)。它們適用於資料可能以任意大小且可能非常大的塊提供或請求的情況,因此避免複製可能會提高效率。

本文解釋了可讀位元組流與普通“預設”流的區別,以及如何建立和使用它們。

注意: 可讀位元組流與“普通”可讀流幾乎相同,並且幾乎所有概念都相同。本文假設您已經理解這些概念,並且只會對其進行表面介紹(如果需要的話)。如果您不熟悉相關概念,請先閱讀:使用可讀流流的概念和使用概述以及Streams API 概念

概述

可讀流提供了一個一致的介面,用於將資料從某個底層源(例如檔案或套接字)流式傳輸到消費者(例如讀取器、轉換流或可寫流)。在普通可讀流中,來自底層源的資料始終透過內部佇列傳遞給消費者。可讀位元組流的不同之處在於,如果內部佇列為空,則底層源可以直接寫入消費者(高效的零複製傳輸)。

透過在underlyingSource物件中指定type: "bytes"來建立可讀位元組流,該物件可以作為第一個引數傳遞給ReadableStream()建構函式。設定此值後,流將使用ReadableByteStreamController建立,並且當呼叫start(controller)pull(controller)回撥函式時,此物件將傳遞給底層源。

ReadableByteStreamController與預設控制器(ReadableStreamDefaultController)的主要區別在於它有一個額外的屬性ReadableByteStreamController.byobRequest,型別為ReadableStreamBYOBRequest。這表示消費者發出的待處理讀取請求,將作為零複製傳輸從底層源進行。如果沒有待處理請求,則此屬性為null

byobRequest僅在對可讀位元組流發出讀取請求且流的內部佇列中沒有資料時可用(如果存在資料,則從這些佇列中滿足請求)。

需要傳輸資料的底層位元組源必須檢查byobRequest屬性,如果可用,則使用它傳輸資料。如果該屬性為null,則傳入資料應使用ReadableByteStreamController.enqueue()新增到流的內部佇列中(這是使用“預設”流時傳輸資料的唯一方式)。

ReadableStreamBYOBRequest具有一個view屬性,它是為傳輸分配的緩衝區上的一個檢視。來自底層源的資料應寫入此屬性,然後底層源必須呼叫respond(),指示寫入的位元組數。這表示資料應被傳輸,並且消費者的待處理讀取請求已解決。呼叫respond()後,view將不能再被寫入。

還有一個額外的方法ReadableStreamBYOBRequest.respondWithNewView(),底層源可以將包含待傳輸資料的“新”檢視傳遞給它。這個新檢視必須位於與原始檢視相同的記憶體緩衝區上,並從相同的起始偏移量開始。如果底層位元組源需要先將檢視傳輸到工作執行緒進行填充(例如),然後才能響應byobRequest,則可以使用此方法。在大多數情況下,不需要此方法。

可讀位元組流通常使用ReadableStreamBYOBReader讀取,可以透過在流上呼叫ReadableStream.getReader()並指定選項引數中的mode: "byob"來獲取。

可讀位元組流也可以使用預設讀取器(ReadableStreamDefaultReader)讀取,但在這種情況下,僅當流啟用了自動緩衝區分配時才建立byobRequest物件(為流的underlyingSource設定了autoAllocateChunkSize)。請注意,在這種情況下,autoAllocateChunkSize指示的大小用於緩衝區大小;對於位元組讀取器,使用的緩衝區由消費者提供。如果未指定該屬性,預設讀取器仍然會“工作”,但底層源永遠不會收到byobRequest,並且所有資料都將透過流的內部佇列傳輸。

除了上述差異之外,位元組流的控制器和底層源與預設流的控制器和底層源非常相似,並且使用方式大同小異

示例

具有位元組讀取器的底層推送源

這個即時示例展示瞭如何使用推送底層位元組源建立可讀位元組流,並使用位元組讀取器讀取它。

與拉式底層位元組源不同,資料可以隨時到達。因此,底層源必須使用controller.byobRequest來傳輸傳入資料(如果存在),否則將資料排隊到流的內部佇列中。此外,由於資料可以隨時到達,因此在underlyingSource.start()回撥函式中設定了監控行為。

該示例深受流規範中推送位元組源示例的影響。它使用一個模擬的“假設套接字”源,該源提供任意大小的資料。讀取器在不同點被有意延遲,以允許底層源使用傳輸和排隊向流傳送資料。未演示背壓支援。

注意: 底層位元組源也可以與預設讀取器一起使用。如果啟用了自動緩衝區分配,當讀取器有未完成的請求且流的內部佇列為空時,控制器將提供固定大小的緩衝區用於零複製傳輸。如果未啟用自動緩衝區分配,則位元組流中的所有資料將始終排隊。這類似於“拉取:底層位元組源示例”中所示的行為。

模擬的底層套接字源

模擬的底層源有三個重要方法

  • select2()表示套接字上的未完成請求。它返回一個Promise,當資料可用時解析。
  • readInto()將資料從套接字讀取到提供的緩衝區中,然後清除資料。
  • close()關閉套接字。

實現非常簡單。如下所示,select2()在超時時建立一個隨機大小的隨機資料緩衝區。建立的資料在readInto()中被讀取到緩衝區然後清除。

js
class MockHypotheticalSocket {
  constructor() {
    this.max_data = 800; // total amount of data to stream from "socket"
    this.max_per_read = 100; // max data per read
    this.min_per_read = 40; // min data per read
    this.data_read = 0; // total data read so far (capped is maxdata)
    this.socketData = null;
  }

  // Method returning promise when this socket is readable.
  select2() {
    // Object used to resolve promise
    const resultObj = {};
    resultObj["bytesRead"] = 0;

    return new Promise((resolve /*, reject */) => {
      if (this.data_read >= this.max_data) {
        // Out of data
        resolve(resultObj);
        return;
      }

      // Emulate slow read of data
      setTimeout(() => {
        const numberBytesReceived = this.getNumberRandomBytesSocket();
        this.data_read += numberBytesReceived;
        this.socketData = this.randomByteArray(numberBytesReceived);
        resultObj["bytesRead"] = numberBytesReceived;
        resolve(resultObj);
      }, 500);
    });
  }

  /* Read data into specified buffer offset */
  readInto(buffer, offset, length) {
    let dataLength = 0;
    if (this.socketData) {
      dataLength = this.socketData.length;
      const myView = new Uint8Array(buffer, offset, length);
      // Write the length of data specified into buffer
      // Code assumes buffer always bigger than incoming data
      for (let i = 0; i < dataLength; i++) {
        myView[i] = this.socketData[i];
      }
      this.socketData = null; // Clear "socket" data after reading
    }
    return dataLength;
  }

  // Dummy close function
  close() {}

  // Return random number bytes in this call of socket
  getNumberRandomBytesSocket() {
    // Capped to remaining data and the max min return-per-read range
    const remaining_data = this.max_data - this.data_read;
    const numberBytesReceived =
      remaining_data < this.min_per_read
        ? remaining_data
        : this.getRandomIntInclusive(
            this.min_per_read,
            Math.min(this.max_per_read, remaining_data),
          );
    return numberBytesReceived;
  }

  // Return random number between two values
  getRandomIntInclusive(min, max) {
    min = Math.ceil(min);
    max = Math.floor(max);
    return Math.floor(Math.random() * (max - min + 1) + min);
  }

  // Return random character string
  randomChars(length = 8) {
    let string = "";
    let choices =
      "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()";

    for (let i = 0; i < length; i++) {
      string += choices.charAt(Math.floor(Math.random() * choices.length));
    }
    return string;
  }

  /* Return random Uint8Array of bytes */
  randomByteArray(bytes = 8) {
    const textEncoder = new TextEncoder();
    return textEncoder.encode(this.randomChars(bytes));
  }
}

建立可讀的套接字推送位元組流

以下程式碼展示瞭如何定義可讀的套接字“推送”位元組流。

underlyingSource物件定義作為第一個引數傳遞給ReadableStream()建構函式。為了使其成為可讀“位元組”流,我們指定type: "bytes"作為該物件的屬性。這確保流獲得一個ReadableByteStreamController(而不是預設控制器(ReadableStreamDefaultController))

由於資料可能會在消費者準備好處理之前到達套接字,因此所有關於讀取底層源的配置都在start()回撥方法中(我們不等待拉取才開始處理資料)。實現會開啟“套接字”並呼叫select2()來請求資料。當返回的 Promise 解析時,程式碼會檢查controller.byobRequest是否存在(是否不為null),如果存在,則呼叫socket.readInto()將資料複製到請求中並傳輸。如果byobRequest不存在,則沒有來自消費流的未完成請求可以作為零複製傳輸來滿足。在這種情況下,使用controller.enqueue()將資料複製到流的內部佇列。

對更多資料的select2()請求會重複釋出,直到返回沒有資料的請求。此時,控制器用於關閉流。

js
const stream = makeSocketStream("dummy host", "dummy port");

const DEFAULT_CHUNK_SIZE = 400;

function makeSocketStream(host, port) {
  const socket = new MockHypotheticalSocket();

  return new ReadableStream({
    type: "bytes",

    start(controller) {
      readRepeatedly().catch((e) => controller.error(e));
      function readRepeatedly() {
        return socket.select2().then(() => {
          // Since the socket can become readable even when there's
          // no pending BYOB requests, we need to handle both cases.
          let bytesRead;
          if (controller.byobRequest) {
            const v = controller.byobRequest.view;
            bytesRead = socket.readInto(v.buffer, v.byteOffset, v.byteLength);
            if (bytesRead === 0) {
              controller.close();
            }
            controller.byobRequest.respond(bytesRead);
            logSource(`byobRequest with ${bytesRead} bytes`);
          } else {
            const buffer = new ArrayBuffer(DEFAULT_CHUNK_SIZE);
            bytesRead = socket.readInto(buffer, 0, DEFAULT_CHUNK_SIZE);
            if (bytesRead === 0) {
              controller.close();
            } else {
              controller.enqueue(new Uint8Array(buffer, 0, bytesRead));
            }
            logSource(`enqueue() ${bytesRead} bytes (no byobRequest)`);
          }

          if (bytesRead === 0) {
            return;
            // no more bytes in source
          }
          return readRepeatedly();
        });
      }
    },

    cancel() {
      socket.close();
      logSource(`cancel(): socket closed`);
    },
  });
}

請注意,readRepeatedly()返回一個 Promise,我們使用它來捕獲設定或處理讀取操作時發生的任何錯誤。然後,錯誤將傳遞給控制器,如上所示(參見readRepeatedly().catch((e) => controller.error(e));)。

最後提供了cancel()方法以關閉底層源;pull()回撥不需要,因此未實現。

消費推送位元組流

以下程式碼為套接字位元組流建立了一個ReadableStreamBYOBReader,並使用它將資料讀取到緩衝區中。請注意,processText()被遞迴呼叫以讀取更多資料,直到緩衝區被填滿。當底層源發出訊號表示沒有更多資料時,reader.read()將把done設定為true,這反過來會完成讀取操作。

此程式碼與上面具有位元組讀取器的底層拉取源示例幾乎完全相同。唯一的區別是讀取器包含一些減慢讀取速度的程式碼,因此日誌輸出可以演示如果讀取速度不夠快,資料將被排隊。

js
const reader = stream.getReader({ mode: "byob" });
let buffer = new ArrayBuffer(4000);
readStream(reader);

function readStream(reader) {
  let bytesReceived = 0;
  let offset = 0;

  while (offset < buffer.byteLength) {
    // read() returns a promise that resolves when a value has been received
    reader
      .read(new Uint8Array(buffer, offset, buffer.byteLength - offset))
      .then(async function processText({ done, value }) {
        // Result objects contain two properties:
        // done  - true if the stream has already given all its data.
        // value - some data. Always undefined when done is true.

        if (done) {
          logConsumer(`readStream() complete. Total bytes: ${bytesReceived}`);
          return;
        }

        buffer = value.buffer;
        offset += value.byteLength;
        bytesReceived += value.byteLength;

        // logConsumer(`Read ${bytesReceived} bytes: ${value}`);
        logConsumer(`Read ${bytesReceived} bytes`);
        result += value;

        // Add delay to emulate when data can't be read and data is enqueued
        if (bytesReceived > 300 && bytesReceived < 600) {
          logConsumer(`Delaying read to emulate slow stream reading`);
          const delay = (ms) =>
            new Promise((resolve) => {
              setTimeout(resolve, ms);
            });
          await delay(1000);
        }

        // Read some more, and call this function again
        return reader
          .read(new Uint8Array(buffer, offset, buffer.byteLength - offset))
          .then(processText);
      });
  }
}

使用讀取器取消流

我們可以使用ReadableStreamBYOBReader.cancel()取消流。對於這個例子,如果點選按鈕並帶有“使用者選擇”的原因(未顯示按鈕的其他 HTML 和程式碼),我們就會呼叫該方法。我們還會記錄取消操作完成的時間。

js
button.addEventListener("click", () => {
  reader
    .cancel("user choice")
    .then(() => logConsumer("reader.cancel complete"));
});

ReadableStreamBYOBReader.releaseLock()可用於釋放讀取器而不取消流。但是請注意,任何未完成的讀取請求將立即被拒絕。稍後可以獲取新的讀取器以讀取剩餘的塊。

監控流的關閉/錯誤

ReadableStreamBYOBReader.closed屬性返回一個Promise,當流關閉時解析,如果發生錯誤則拒絕。雖然在這種情況下不會出現錯誤,但以下程式碼應記錄完成情況。

js
reader.closed
  .then(() => {
    logConsumer("ReadableStreamBYOBReader.closed: resolved");
  })
  .catch(() => {
    logConsumer("ReadableStreamBYOBReader.closed: rejected:");
  });

結果

底層推送源(左)和消費者(右)的日誌輸出如下所示。請注意中間資料被排隊而不是作為零複製操作傳輸的時期。

具有位元組讀取器的底層拉取源

這個即時示例展示瞭如何從“拉取”底層位元組源(例如檔案)讀取資料,並透過流以零複製傳輸方式將其傳輸到ReadableStreamBYOBReader

模擬的底層檔案源

對於底層拉取源,我們使用以下類(非常粗略地)模擬 Node.js 的FileHandle,特別是read()方法。該類生成隨機資料以表示檔案。read()方法將“半隨機”大小的隨機資料塊從指定位置讀取到提供的緩衝區中。close()方法不執行任何操作:它僅用於演示在定義流的建構函式時可以在何處關閉源。

注意: 所有“拉取源”示例都使用類似的類。此處僅供參考(以便清楚地表明它是模擬的)。

js
class MockUnderlyingFileHandle {
  constructor() {
    this.maxdata = 100; // "file size"
    this.maxReadChunk = 25; // "max read chunk size"
    this.minReadChunk = 13; // "min read chunk size"
    this.filedata = this.randomByteArray(this.maxdata);
    this.position = 0;
  }

  // Read data from "file" at position/length into specified buffer offset
  read(buffer, offset, length, position) {
    // Object used to resolve promise
    const resultObj = {};
    resultObj["buffer"] = buffer;
    resultObj["bytesRead"] = 0;

    return new Promise((resolve /*, reject */) => {
      if (position >= this.maxdata) {
        // Out of data
        resolve(resultObj);
        return;
      }

      // Simulate a file read that returns random numbers of bytes
      // Read minimum of bytes requested and random bytes that can be returned
      let readLength =
        Math.floor(
          Math.random() * (this.maxReadChunk - this.minReadChunk + 1),
        ) + this.minReadChunk;
      readLength = length > readLength ? readLength : length;

      // Read random data into supplied buffer
      const myView = new Uint8Array(buffer, offset, readLength);
      // Write the length of data specified
      for (let i = 0; i < readLength; i++) {
        myView[i] = this.filedata[position + i];
        resultObj["bytesRead"] = i + 1;
        if (position + i + 1 >= this.maxdata) {
          break;
        }
      }
      // Emulate slow read of data
      setTimeout(() => {
        resolve(resultObj);
      }, 1000);
    });
  }

  // Dummy close function
  close() {}

  // Return random character string
  randomChars(length = 8) {
    let string = "";
    let choices =
      "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()";

    for (let i = 0; i < length; i++) {
      string += choices.charAt(Math.floor(Math.random() * choices.length));
    }
    return string;
  }

  // Return random Uint8Array of bytes
  randomByteArray(bytes = 8) {
    const textEncoder = new TextEncoder();
    return textEncoder.encode(this.randomChars(bytes));
  }
}

建立可讀檔案位元組流

以下程式碼展示瞭如何定義可讀檔案位元組流。

與上一個示例一樣,underlyingSource物件定義作為第一個引數傳遞給ReadableStream()建構函式。為了使其成為可讀“位元組”流,我們指定type: "bytes"作為該物件的屬性。這確保流獲得一個ReadableByteStreamController

start()函式只是開啟檔案控制代碼,然後在cancel()回撥中關閉。提供cancel()是為了在呼叫ReadableStream.cancel()ReadableStreamDefaultController.close()時清理任何資源。

大部分有趣的程式碼都在pull()回撥中。它將資料從檔案複製到待處理的讀取請求(ReadableByteStreamController.byobRequest),然後呼叫respond()以指示緩衝區中有多少資料並傳輸它。如果從檔案傳輸了 0 位元組,則我們知道所有資料都已複製,並呼叫控制器上的close(),這反過來將導致在底層源上呼叫cancel()

js
const stream = makeReadableByteFileStream("dummy file.txt");

function makeReadableByteFileStream(filename) {
  let fileHandle;
  let position = 0;
  return new ReadableStream({
    type: "bytes", // An underlying byte stream!
    start(controller) {
      // Called to initialize the underlying source.
      // For a file source open a file handle (here we just create the mocked object).
      fileHandle = new MockUnderlyingFileHandle();
      logSource(
        `start(): ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
      );
    },
    async pull(controller) {
      // Called when there is a pull request for data
      const theView = controller.byobRequest.view;
      const { bytesRead, buffer } = await fileHandle.read(
        theView.buffer,
        theView.byteOffset,
        theView.byteLength,
        position,
      );
      if (bytesRead === 0) {
        await fileHandle.close();
        controller.close();
        controller.byobRequest.respond(0);
        logSource(
          `pull() with byobRequest. Close controller (read bytes: ${bytesRead})`,
        );
      } else {
        position += bytesRead;
        controller.byobRequest.respond(bytesRead);
        logSource(`pull() with byobRequest. Transfer ${bytesRead} bytes`);
      }
    },
    cancel(reason) {
      // This is called if the stream is cancelled (via reader or controller).
      // Clean up any resources
      fileHandle.close();
      logSource(`cancel() with reason: ${reason}`);
    },
  });
}

消費位元組流

以下程式碼為檔案位元組流建立了一個ReadableStreamBYOBReader,並使用它將資料讀取到緩衝區中。請注意,processText()被遞迴呼叫以讀取更多資料,直到緩衝區被填滿。當底層源發出訊號表示沒有更多資料時,reader.read()將把done設定為true,這反過來會完成讀取操作。

js
const reader = stream.getReader({ mode: "byob" });
let buffer = new ArrayBuffer(200);
readStream(reader);

function readStream(reader) {
  let bytesReceived = 0;
  let offset = 0;

  // read() returns a promise that resolves when a value has been received
  reader
    .read(new Uint8Array(buffer, offset, buffer.byteLength - offset))
    .then(function processText({ done, value }) {
      // Result objects contain two properties:
      // done  - true if the stream has already given all its data.
      // value - some data. Always undefined when done is true.

      if (done) {
        logConsumer(`readStream() complete. Total bytes: ${bytesReceived}`);
        return;
      }

      buffer = value.buffer;
      offset += value.byteLength;
      bytesReceived += value.byteLength;

      logConsumer(
        `Read ${value.byteLength} (${bytesReceived}) bytes: ${value}`,
      );
      result += value;

      // Read some more, and call this function again
      return reader
        .read(new Uint8Array(buffer, offset, buffer.byteLength - offset))
        .then(processText);
    });
}

最後,我們新增一個處理程式,當點選按鈕時(未顯示按鈕的其他 HTML 和程式碼)將取消流。

js
button.addEventListener("click", () => {
  reader.cancel("user choice").then(() => {
    logConsumer(`reader.cancel complete`);
  });
});

結果

底層拉取源(左)和消費者(右)的日誌記錄如下所示。特別值得注意的是:

  • start()函式傳遞了一個ReadableByteStreamController
  • 傳遞給讀取器的緩衝區足夠大,可以包含整個“檔案”。底層資料來源以隨機大小的塊提供資料。

具有預設讀取器的底層拉取源

這個即時示例展示瞭如何使用預設讀取器(ReadableStreamDefaultReader)以零複製傳輸方式讀取相同的資料。這使用了與上一個示例中相同的模擬底層檔案源

建立具有自動緩衝區分配的可讀檔案位元組流

我們底層源的唯一區別在於,我們必須指定autoAllocateChunkSize,並且該大小將用作controller.byobRequest的檢視緩衝區大小,而不是由消費者提供的大小。

js
const DEFAULT_CHUNK_SIZE = 20;
const stream = makeReadableByteFileStream("dummy file.txt");

function makeReadableByteFileStream(filename) {
  let fileHandle;
  let position = 0;
  return new ReadableStream({
    type: "bytes", // An underlying byte stream!
    start(controller) {
      // Called to initialize the underlying source.
      // For a file source open a file handle (here we just create the mocked object).
      fileHandle = new MockUnderlyingFileHandle();
      logSource(
        `start(): ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
      );
    },
    async pull(controller) {
      // Called when there is a pull request for data
      const theView = controller.byobRequest.view;
      const { bytesRead, buffer } = await fileHandle.read(
        theView.buffer,
        theView.byteOffset,
        theView.byteLength,
        position,
      );
      if (bytesRead === 0) {
        await fileHandle.close();
        controller.close();
        controller.byobRequest.respond(0);
        logSource(
          `pull() with byobRequest. Close controller (read bytes: ${bytesRead})`,
        );
      } else {
        position += bytesRead;
        controller.byobRequest.respond(bytesRead);
        logSource(`pull() with byobRequest. Transfer ${bytesRead} bytes`);
      }
    },
    cancel(reason) {
      // This is called if the stream is cancelled (via reader or controller).
      // Clean up any resources
      fileHandle.close();
      logSource(`cancel() with reason: ${reason}`);
    },
    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE, // Only relevant if using a default reader
  });
}

使用預設讀取器消費位元組流

以下程式碼透過呼叫stream.getReader();(不指定模式)為檔案位元組流建立了一個ReadableStreamDefaultReader,並使用它將資料讀取到緩衝區中。程式碼的操作與上一個示例相同,只是緩衝區由流而不是消費者提供。

js
const reader = stream.getReader();
readStream(reader);

function readStream(reader) {
  let bytesReceived = 0;
  let result = "";

  // read() returns a promise that resolves
  // when a value has been received
  reader.read().then(function processText({ done, value }) {
    // Result objects contain two properties:
    // done  - true if the stream has already given you all its data.
    // value - some data. Always undefined when done is true.
    if (done) {
      logConsumer(`readStream() complete. Total bytes: ${bytesReceived}`);
      return;
    }

    bytesReceived += value.length;
    logConsumer(
      `Read ${value.length} (${bytesReceived}). Current bytes = ${value}`,
    );
    result += value;

    // Read some more, and call this function again
    return reader.read().then(processText);
  });
}

最後,我們新增一個處理程式,當點選按鈕時(未顯示按鈕的其他 HTML 和程式碼)將取消流。

js
button.addEventListener("click", () => {
  reader.cancel("user choice").then(() => {
    logConsumer(`reader.cancel complete`);
  });
});

結果

底層位元組拉取源(左)和消費者(右)的日誌記錄如下所示。

請注意,現在塊的寬度最多為 20 位元組,因為這是底層位元組源中指定的自動分配緩衝區的大小(autoAllocateChunkSize)。這些都是零複製傳輸。

具有預設讀取器且不進行分配的底層拉取源

為了完整起見,我們還可以使用不自動分配緩衝區的預設讀取器與位元組源。

然而在這種情況下,控制器不會為底層源提供byobRequest以供寫入。相反,底層源將不得不將資料排隊。請注意,在pull()中,為了支援這種情況,我們需要檢查byobRequest是否存在。

js
const stream = makeReadableByteFileStream("dummy file.txt");
const DEFAULT_CHUNK_SIZE = 40;

function makeReadableByteFileStream(filename) {
  let fileHandle;
  let position = 0;
  return new ReadableStream({
    type: "bytes", // An underlying byte stream!
    start(controller) {
      // Called to initialize the underlying source.
      // For a file source open a file handle (here we just create the mocked object).
      fileHandle = new MockUnderlyingFileHandle();
      logSource(
        `start(): ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
      );
    },
    async pull(controller) {
      // Called when there is a pull request for data
      if (controller.byobRequest) {
        const theView = controller.byobRequest.view;
        const { bytesRead, buffer } = await fileHandle.read(
          theView.buffer,
          theView.byteOffset,
          theView.byteLength,
          position,
        );
        if (bytesRead === 0) {
          await fileHandle.close();
          controller.close();
          controller.byobRequest.respond(0);
          logSource(
            `pull() with byobRequest. Close controller (read bytes: ${bytesRead})`,
          );
        } else {
          position += bytesRead;
          controller.byobRequest.respond(bytesRead);
          logSource(`pull() with byobRequest. Transfer ${bytesRead} bytes`);
        }
      } else {
        // No BYOBRequest so enqueue data to stream
        // NOTE, this branch would only execute for a default reader if autoAllocateChunkSize is not defined.
        const myNewBuffer = new Uint8Array(DEFAULT_CHUNK_SIZE);
        const { bytesRead, buffer } = await fileHandle.read(
          myNewBuffer.buffer,
          myNewBuffer.byteOffset,
          myNewBuffer.byteLength,
          position,
        );
        if (bytesRead === 0) {
          await fileHandle.close();
          controller.close();
          controller.enqueue(myNewBuffer);
          logSource(
            `pull() with no byobRequest. Close controller (read bytes: ${bytesRead})`,
          );
        } else {
          position += bytesRead;
          controller.enqueue(myNewBuffer);
          logSource(`pull() with no byobRequest. enqueue() ${bytesRead} bytes`);
        }
      }
    },
    cancel(reason) {
      // This is called if the stream is cancelled (via reader or controller).
      // Clean up any resources
      fileHandle.close();
      logSource(`cancel() with reason: ${reason}`);
    },
  });
}

結果

底層拉取源(左)和消費者(右)的日誌記錄如下所示。請注意,底層源側顯示資料已排隊,而不是零位元組傳輸。

另見