Source code

Revision control

Copy as Markdown

Other Tools

'use strict';
const assert = require('assert');
const EventEmitter = require('events');
const { createServer } = require('http');
const { Duplex } = require('stream');
const { randomBytes } = require('crypto');
const createWebSocketStream = require('../lib/stream');
const Sender = require('../lib/sender');
const WebSocket = require('..');
const { EMPTY_BUFFER } = require('../lib/constants');
describe('createWebSocketStream', () => {
it('is exposed as a property of the `WebSocket` class', () => {
assert.strictEqual(WebSocket.createWebSocketStream, createWebSocketStream);
});
it('returns a `Duplex` stream', () => {
const duplex = createWebSocketStream(new EventEmitter());
assert.ok(duplex instanceof Duplex);
});
it('passes the options object to the `Duplex` constructor', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
const duplex = createWebSocketStream(ws, {
allowHalfOpen: false,
encoding: 'utf8'
});
duplex.on('data', (chunk) => {
assert.strictEqual(chunk, 'hi');
duplex.on('close', () => {
wss.close(done);
});
});
});
wss.on('connection', (ws) => {
ws.send(Buffer.from('hi'));
ws.close();
});
});
describe('The returned stream', () => {
it('buffers writes if `readyState` is `CONNECTING`', (done) => {
const chunk = randomBytes(1024);
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
assert.strictEqual(ws.readyState, WebSocket.CONNECTING);
const duplex = createWebSocketStream(ws);
duplex.write(chunk);
});
wss.on('connection', (ws) => {
ws.on('message', (message, isBinary) => {
ws.on('close', (code, reason) => {
assert.deepStrictEqual(message, chunk);
assert.ok(isBinary);
assert.strictEqual(code, 1005);
assert.strictEqual(reason, EMPTY_BUFFER);
wss.close(done);
});
});
ws.close();
});
});
it('errors if a write occurs when `readyState` is `CLOSING`', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
const duplex = createWebSocketStream(ws);
duplex.on('error', (err) => {
assert.ok(duplex.destroyed);
assert.ok(err instanceof Error);
assert.strictEqual(
err.message,
'WebSocket is not open: readyState 2 (CLOSING)'
);
duplex.on('close', () => {
wss.close(done);
});
});
ws.on('open', () => {
ws._receiver.on('conclude', () => {
duplex.write('hi');
});
});
});
wss.on('connection', (ws) => {
ws.close();
});
});
it('errors if a write occurs when `readyState` is `CLOSED`', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
const duplex = createWebSocketStream(ws);
duplex.on('error', (err) => {
assert.ok(duplex.destroyed);
assert.ok(err instanceof Error);
assert.strictEqual(
err.message,
'WebSocket is not open: readyState 3 (CLOSED)'
);
duplex.on('close', () => {
wss.close(done);
});
});
ws.on('close', () => {
duplex.write('hi');
});
});
wss.on('connection', (ws) => {
ws.close();
});
});
it('does not error if `_final()` is called while connecting', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
assert.strictEqual(ws.readyState, WebSocket.CONNECTING);
const duplex = createWebSocketStream(ws);
duplex.on('close', () => {
wss.close(done);
});
duplex.resume();
duplex.end();
});
});
it('makes `_final()` a noop if no socket is assigned', (done) => {
const server = createServer();
server.on('upgrade', (request, socket) => {
socket.on('end', socket.end);
const headers = [
'HTTP/1.1 101 Switching Protocols',
'Upgrade: websocket',
'Connection: Upgrade',
'Sec-WebSocket-Accept: foo'
];
socket.write(headers.concat('\r\n').join('\r\n'));
});
server.listen(() => {
const called = [];
const ws = new WebSocket(`ws://localhost:${server.address().port}`);
const duplex = WebSocket.createWebSocketStream(ws);
const final = duplex._final;
duplex._final = (callback) => {
called.push('final');
assert.strictEqual(ws.readyState, WebSocket.CLOSING);
assert.strictEqual(ws._socket, null);
final(callback);
};
duplex.on('error', (err) => {
called.push('error');
assert.ok(err instanceof Error);
assert.strictEqual(
err.message,
'Invalid Sec-WebSocket-Accept header'
);
});
duplex.on('finish', () => {
called.push('finish');
});
duplex.on('close', () => {
assert.deepStrictEqual(called, ['final', 'error']);
server.close(done);
});
ws.on('upgrade', () => {
process.nextTick(() => {
duplex.end();
});
});
});
});
it('reemits errors', (done) => {
let duplexCloseEventEmitted = false;
let serverClientCloseEventEmitted = false;
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
const duplex = createWebSocketStream(ws);
duplex.on('error', (err) => {
assert.ok(err instanceof RangeError);
assert.strictEqual(err.code, 'WS_ERR_INVALID_OPCODE');
assert.strictEqual(
err.message,
'Invalid WebSocket frame: invalid opcode 5'
);
duplex.on('close', () => {
duplexCloseEventEmitted = true;
if (serverClientCloseEventEmitted) wss.close(done);
});
});
});
wss.on('connection', (ws) => {
ws._socket.write(Buffer.from([0x85, 0x00]));
ws.on('close', (code, reason) => {
assert.strictEqual(code, 1002);
assert.deepStrictEqual(reason, EMPTY_BUFFER);
serverClientCloseEventEmitted = true;
if (duplexCloseEventEmitted) wss.close(done);
});
});
});
it('does not swallow errors that may occur while destroying', (done) => {
const frame = Buffer.concat(
Sender.frame(Buffer.from([0x22, 0xfa, 0xec, 0x78]), {
fin: true,
rsv1: true,
opcode: 0x02,
mask: false,
readOnly: false
})
);
const wss = new WebSocket.Server(
{
perMessageDeflate: true,
port: 0
},
() => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
const duplex = createWebSocketStream(ws);
duplex.on('error', (err) => {
assert.ok(err instanceof Error);
assert.strictEqual(err.code, 'Z_DATA_ERROR');
assert.strictEqual(err.errno, -3);
duplex.on('close', () => {
wss.close(done);
});
});
let bytesRead = 0;
ws.on('open', () => {
ws._socket.on('data', (chunk) => {
bytesRead += chunk.length;
if (bytesRead === frame.length) duplex.destroy();
});
});
}
);
wss.on('connection', (ws) => {
ws._socket.write(frame);
});
});
it("does not suppress the throwing behavior of 'error' events", (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
createWebSocketStream(ws);
});
wss.on('connection', (ws) => {
ws._socket.write(Buffer.from([0x85, 0x00]));
});
assert.strictEqual(process.listenerCount('uncaughtException'), 1);
const [listener] = process.listeners('uncaughtException');
process.removeAllListeners('uncaughtException');
process.once('uncaughtException', (err) => {
assert.ok(err instanceof Error);
assert.strictEqual(
err.message,
'Invalid WebSocket frame: invalid opcode 5'
);
process.on('uncaughtException', listener);
wss.close(done);
});
});
it("is destroyed after 'end' and 'finish' are emitted (1/2)", (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const events = [];
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
const duplex = createWebSocketStream(ws);
duplex.on('end', () => {
events.push('end');
assert.ok(duplex.destroyed);
});
duplex.on('close', () => {
assert.deepStrictEqual(events, ['finish', 'end']);
wss.close(done);
});
duplex.on('finish', () => {
events.push('finish');
assert.ok(!duplex.destroyed);
assert.ok(duplex.readable);
duplex.resume();
});
ws.on('close', () => {
duplex.end();
});
});
wss.on('connection', (ws) => {
ws.send('foo');
ws.close();
});
});
it("is destroyed after 'end' and 'finish' are emitted (2/2)", (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const events = [];
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
const duplex = createWebSocketStream(ws);
duplex.on('end', () => {
events.push('end');
assert.ok(!duplex.destroyed);
assert.ok(duplex.writable);
duplex.end();
});
duplex.on('close', () => {
assert.deepStrictEqual(events, ['end', 'finish']);
wss.close(done);
});
duplex.on('finish', () => {
events.push('finish');
});
duplex.resume();
});
wss.on('connection', (ws) => {
ws.close();
});
});
it('handles backpressure (1/3)', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
// eslint-disable-next-line no-unused-vars
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
});
wss.on('connection', (ws) => {
const duplex = createWebSocketStream(ws);
duplex.resume();
duplex.on('drain', () => {
duplex.on('close', () => {
wss.close(done);
});
duplex.end();
});
const chunk = randomBytes(1024);
let ret;
do {
ret = duplex.write(chunk);
} while (ret !== false);
});
});
it('handles backpressure (2/3)', (done) => {
const wss = new WebSocket.Server(
{ port: 0, perMessageDeflate: true },
() => {
const called = [];
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
const duplex = createWebSocketStream(ws);
const read = duplex._read;
duplex._read = () => {
duplex._read = read;
called.push('read');
assert.ok(ws._receiver._writableState.needDrain);
read();
assert.ok(ws._socket.isPaused());
};
ws.on('open', () => {
ws._socket.on('pause', () => {
duplex.resume();
});
ws._receiver.on('drain', () => {
called.push('drain');
assert.ok(!ws._socket.isPaused());
duplex.end();
});
const opts = {
fin: true,
opcode: 0x02,
mask: false,
readOnly: false
};
const list = [
...Sender.frame(randomBytes(16 * 1024), { rsv1: false, ...opts }),
...Sender.frame(Buffer.alloc(1), { rsv1: true, ...opts })
];
// This hack is used because there is no guarantee that more than
// 16 KiB will be sent as a single TCP packet.
ws._socket.push(Buffer.concat(list));
});
duplex.on('close', () => {
assert.deepStrictEqual(called, ['read', 'drain']);
wss.close(done);
});
}
);
});
it('handles backpressure (3/3)', (done) => {
const wss = new WebSocket.Server(
{ port: 0, perMessageDeflate: true },
() => {
const called = [];
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
const duplex = createWebSocketStream(ws);
const read = duplex._read;
duplex._read = () => {
called.push('read');
assert.ok(!ws._receiver._writableState.needDrain);
read();
assert.ok(!ws._socket.isPaused());
duplex.end();
};
ws.on('open', () => {
ws._receiver.on('drain', () => {
called.push('drain');
assert.ok(ws._socket.isPaused());
duplex.resume();
});
const opts = {
fin: true,
opcode: 0x02,
mask: false,
readOnly: false
};
const list = [
...Sender.frame(randomBytes(16 * 1024), { rsv1: false, ...opts }),
...Sender.frame(Buffer.alloc(1), { rsv1: true, ...opts })
];
ws._socket.push(Buffer.concat(list));
});
duplex.on('close', () => {
assert.deepStrictEqual(called, ['drain', 'read']);
wss.close(done);
});
}
);
});
it('can be destroyed (1/2)', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const error = new Error('Oops');
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
const duplex = createWebSocketStream(ws);
duplex.on('error', (err) => {
assert.strictEqual(err, error);
duplex.on('close', () => {
wss.close(done);
});
});
ws.on('open', () => {
duplex.destroy(error);
});
});
});
it('can be destroyed (2/2)', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
const duplex = createWebSocketStream(ws);
duplex.on('close', () => {
wss.close(done);
});
ws.on('open', () => {
duplex.destroy();
});
});
});
it('converts text messages to strings in readable object mode', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const events = [];
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
const duplex = createWebSocketStream(ws, { readableObjectMode: true });
duplex.on('data', (data) => {
events.push('data');
assert.strictEqual(data, 'foo');
});
duplex.on('end', () => {
events.push('end');
duplex.end();
});
duplex.on('close', () => {
assert.deepStrictEqual(events, ['data', 'end']);
wss.close(done);
});
});
wss.on('connection', (ws) => {
ws.send('foo');
ws.close();
});
});
it('resumes the socket if `readyState` is `CLOSING`', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
const duplex = createWebSocketStream(ws);
ws.on('message', () => {
assert.ok(ws._socket.isPaused());
duplex.on('close', () => {
wss.close(done);
});
duplex.end();
process.nextTick(() => {
assert.strictEqual(ws.readyState, WebSocket.CLOSING);
duplex.resume();
});
});
});
wss.on('connection', (ws) => {
ws.send(randomBytes(16 * 1024));
});
});
});
});