Source code

Revision control

Copy as Markdown

Other Tools

const SAME_COMPARTMENT = "same-compartment";
const IFRAME_COMPARTMENT = "iframe-compartment";
const BIG_BUFFER_SIZE = 1000000;
const ITER_MAX = 10;
function makeBuffer(size) {
let buffer = new Uint8Array(size);
buffer.fill(42);
let value = 0;
for (let i = 0; i < 1000000; i += 1000) {
buffer.set([++value % 255], i);
}
return buffer;
}
function apply_compartment(compartment, data) {
if (compartment == SAME_COMPARTMENT) {
return self[data.func](data.args, self);
}
if (compartment == IFRAME_COMPARTMENT) {
const iframe = document.querySelector("#iframe").contentWindow;
return iframe[data.func](data.args, self);
}
ok(false, "Invalid compartment value");
return undefined;
}
async function test_nativeStream(compartment) {
info("test_nativeStream");
let r = await fetch("/");
return apply_compartment(compartment, {
func: "test_nativeStream_continue",
args: r,
});
}
async function test_nativeStream_continue(r, that) {
that.ok(r.body instanceof that.ReadableStream, "We have a ReadableStream");
let a = r.clone();
that.ok(a instanceof that.Response, "We have a cloned Response");
that.ok(a.body instanceof that.ReadableStream, "We have a ReadableStream");
let b = a.clone();
that.ok(b instanceof that.Response, "We have a cloned Response");
that.ok(b.body instanceof that.ReadableStream, "We have a ReadableStream");
let blob = await r.blob();
that.ok(blob instanceof that.Blob, "We have a blob");
let d = await a.body.getReader().read();
that.ok(!d.done, "We have read something!");
blob = await b.blob();
that.ok(blob instanceof that.Blob, "We have a blob");
}
async function test_timeout(compartment) {
info("test_timeout");
let blob = new Blob([""]);
let r = await fetch(URL.createObjectURL(blob));
return apply_compartment(compartment, {
func: "test_timeout_continue",
args: r,
});
}
async function test_timeout_continue(r, that) {
await r.body.getReader().read();
await new Promise(resolve => setTimeout(resolve, 0));
try {
await r.blob();
that.ok(false, "We cannot have a blob here!");
} catch (exc) {
that.ok(true, "We cannot have a blob here!");
}
}
async function test_nonNativeStream(compartment) {
info("test_nonNativeStream");
let buffer = makeBuffer(BIG_BUFFER_SIZE);
info("Buffer size: " + buffer.byteLength);
let r = new Response(
new ReadableStream({
start: controller => {
controller.enqueue(buffer);
controller.close();
},
})
);
return apply_compartment(compartment, {
func: "test_nonNativeStream_continue",
args: { r, buffer },
});
}
async function test_nonNativeStream_continue(data, that) {
that.ok(
data.r.body instanceof that.ReadableStream,
"We have a ReadableStream"
);
let a = data.r.clone();
that.ok(a instanceof that.Response, "We have a cloned Response");
that.ok(a.body instanceof that.ReadableStream, "We have a ReadableStream");
let b = a.clone();
that.ok(b instanceof that.Response, "We have a cloned Response");
that.ok(b.body instanceof that.ReadableStream, "We have a ReadableStream");
let blob = await data.r.blob();
that.ok(blob instanceof that.Blob, "We have a blob");
let d = await a.body.getReader().read();
that.ok(!d.done, "We have read something!");
blob = await b.blob();
that.ok(blob instanceof that.Blob, "We have a blob");
that.is(blob.size, data.buffer.byteLength, "Blob size matches");
}
async function test_noUint8Array(compartment) {
info("test_noUint8Array");
let r = new Response(
new ReadableStream({
start: controller => {
controller.enqueue("hello world!");
controller.close();
},
})
);
return apply_compartment(compartment, {
func: "test_noUint8Array_continue",
args: r,
});
}
async function test_noUint8Array_continue(r, that) {
that.ok(r.body instanceof that.ReadableStream, "We have a ReadableStream");
try {
await r.blob();
that.ok(false, "We cannot have a blob here!");
} catch (ex) {
that.ok(true, "We cannot have a blob here!");
}
}
async function test_pendingStream(compartment) {
let r = new Response(
new ReadableStream({
start: controller => {
controller.enqueue(makeBuffer(BIG_BUFFER_SIZE));
// Let's keep this controler open.
self.ccc = controller;
},
})
);
return apply_compartment(compartment, {
func: "test_pendingStream_continue",
args: r,
});
}
async function test_pendingStream_continue(r, that) {
let d = await r.body.getReader().read();
that.ok(!d.done, "We have read something!");
if ("close" in that) {
that.close();
}
}
async function test_nativeStream_cache(compartment) {
info("test_nativeStream_cache");
let origBody = "123456789abcdef";
let url = "/nativeStream";
let cache = await caches.open("nativeStream");
info("Storing a body as a string");
await cache.put(url, new Response(origBody));
return apply_compartment(compartment, {
func: "test_nativeStream_cache_continue",
args: { caches, cache, url, origBody },
});
}
async function test_nativeStream_cache_continue(data, that) {
that.info("Retrieving the stored value");
let cacheResponse = await data.cache.match(data.url);
that.info("Converting the response to text");
let cacheBody = await cacheResponse.text();
that.is(data.origBody, cacheBody, "Bodies match");
await data.caches.delete("nativeStream");
}
async function test_nonNativeStream_cache(compartment) {
info("test_nonNativeStream_cache");
let url = "/nonNativeStream";
let cache = await caches.open("nonNativeStream");
let buffer = makeBuffer(BIG_BUFFER_SIZE);
info("Buffer size: " + buffer.byteLength);
info("Storing a body as a string");
let r = new Response(
new ReadableStream({
start: controller => {
controller.enqueue(buffer);
controller.close();
},
})
);
return apply_compartment(compartment, {
func: "test_nonNativeStream_cache_continue",
args: { caches, cache, buffer, r },
});
}
async function test_nonNativeStream_cache_continue(data, that) {
await data.cache.put(data.url, data.r);
that.info("Retrieving the stored value");
let cacheResponse = await data.cache.match(data.url);
that.info("Converting the response to text");
let cacheBody = await cacheResponse.arrayBuffer();
that.ok(cacheBody instanceof that.ArrayBuffer, "Body is an array buffer");
that.is(cacheBody.byteLength, BIG_BUFFER_SIZE, "Body length is correct");
let value = 0;
for (let i = 0; i < 1000000; i += 1000) {
that.is(
new Uint8Array(cacheBody)[i],
++value % 255,
"byte in position " + i + " is correct"
);
}
await data.caches.delete("nonNativeStream");
}
async function test_codeExecution(compartment) {
info("test_codeExecution");
let r = new Response(
new ReadableStream({
start(c) {
controller = c;
},
pull() {
console.log("pull called");
},
})
);
return apply_compartment(compartment, {
func: "test_codeExecution_continue",
args: r,
});
}
// This is intended to just be a drop-in replacement for an old observer
// notification.
function addConsoleStorageListener(listener) {
const ConsoleAPIStorage = SpecialPowers.Cc[
"@mozilla.org/consoleAPI-storage;1"
].getService(SpecialPowers.Ci.nsIConsoleAPIStorage);
listener.__handler = (message, id) => {
listener.observe(message, id);
};
ConsoleAPIStorage.addLogEventListener(
listener.__handler,
SpecialPowers.wrap(document).nodePrincipal
);
}
function removeConsoleStorageListener(listener) {
const ConsoleAPIStorage = SpecialPowers.Cc[
"@mozilla.org/consoleAPI-storage;1"
].getService(SpecialPowers.Ci.nsIConsoleAPIStorage);
ConsoleAPIStorage.removeLogEventListener(listener.__handler);
}
async function test_codeExecution_continue(r, that) {
function consoleListener() {
addConsoleStorageListener(this);
}
var promise = new Promise(resolve => {
consoleListener.prototype = {
observe(aSubject) {
that.ok(true, "Something has been received");
var obj = aSubject.wrappedJSObject;
if (obj.arguments[0] && obj.arguments[0] === "pull called") {
that.ok(true, "Message received!");
removeConsoleStorageListener(this);
resolve();
}
},
};
});
var cl = new consoleListener();
r.body.getReader().read();
await promise;
}
async function test_global(compartment) {
info("test_global: " + compartment);
self.foo = 42;
self.iter = ITER_MAX;
let r = new Response(
new ReadableStream({
start(c) {
self.controller = c;
},
pull() {
if (!("iter" in self) || self.iter < 0 || self.iter > ITER_MAX) {
throw "Something bad is happening here!";
}
let buffer = new Uint8Array(1);
buffer.fill(self.foo);
self.controller.enqueue(buffer);
if (--self.iter == 0) {
controller.close();
}
},
})
);
return apply_compartment(compartment, {
func: "test_global_continue",
args: r,
});
}
async function test_global_continue(r, that) {
let a = await r.arrayBuffer();
that.is(
Object.getPrototypeOf(a),
that.ArrayBuffer.prototype,
"Body is an array buffer"
);
that.is(a.byteLength, ITER_MAX, "Body length is correct");
for (let i = 0; i < ITER_MAX; ++i) {
that.is(new Uint8Array(a)[i], 42, "Byte " + i + " is correct");
}
}
function workify(func) {
info("Workifying " + func);
return new Promise((resolve, reject) => {
let worker = new Worker("worker_readableStreams.js");
worker.postMessage(func);
worker.onmessage = function (e) {
if (e.data.type == "done") {
resolve();
return;
}
if (e.data.type == "error") {
reject(e.data.message);
return;
}
if (e.data.type == "test") {
ok(e.data.test, e.data.message);
return;
}
if (e.data.type == "info") {
info(e.data.message);
}
};
});
}