Source code

Revision control

Copy as Markdown

Other Tools

// |reftest| skip-if(!this.ReadableStream||!this.drainJobQueue)
if ("ignoreUnhandledRejections" in this) {
ignoreUnhandledRejections();
}
// Example of a stream that enqueues data asynchronously, whether the reader
// wants it or not, the "push" model.
let fbStream = new ReadableStream({
start(controller) {
simulatePacketsDriftingIn(controller);
},
});
async function simulatePacketsDriftingIn(controller) {
for (let i = 1; i <= 30; i++) {
let importantData =
(i % 15 == 0 ? "FizzBuzz" :
i % 5 == 0 ? "Buzz":
i % 3 == 0 ? "Fizz" :
String(i));
controller.enqueue(importantData);
await asyncSleep(1 + i % 7);
}
controller.close();
}
const expected = [
"1", "2", "Fizz", "4", "Buzz", "Fizz", "7", "8", "Fizz", "Buzz",
"11", "Fizz", "13", "14", "FizzBuzz", "16", "17", "Fizz", "19", "Buzz",
"Fizz", "22", "23", "Fizz", "Buzz", "26", "Fizz", "28", "29", "FizzBuzz"
];
async function test() {
assertEq(fbStream.locked, false);
let reader = fbStream.getReader();
assertEq(fbStream.locked, true);
let results = [];
while (true) {
let r = await reader.read();
if (r.done) {
break;
}
results.push(r.value);
}
assertEq(results.join("-"), expected.join("-"));
reader.releaseLock();
assertEq(fbStream.locked, false);
}
runAsyncTest(test);