Source code
Revision control
Copy as Markdown
Other Tools
var assert = require('assert');
// The Connection class
// ====================
// The Connection class manages HTTP/2 connections. Each instance corresponds to one transport
// stream (TCP stream). It operates by sending and receiving frames and is implemented as a
// [Flow](flow.html) subclass.
var Flow = require('./flow').Flow;
exports.Connection = Connection;
// Public API
// ----------
// * **new Connection(log, firstStreamId, settings)**: create a new Connection
//
// * **Event: 'error' (type)**: signals a connection level error made by the other end
//
// * **Event: 'peerError' (type)**: signals the receipt of a GOAWAY frame that contains an error
// code other than NO_ERROR
//
// * **Event: 'stream' (stream)**: signals that there's an incoming stream
//
// * **createStream(): stream**: initiate a new stream
//
// * **set(settings, callback)**: change the value of one or more settings according to the
// key-value pairs of `settings`. The callback is called after the peer acknowledged the changes.
//
// * **ping([callback])**: send a ping and call callback when the answer arrives
//
// * **close([error])**: close the stream with an error code
// Constructor
// -----------
// The main aspects of managing the connection are:
function Connection(log, firstStreamId, settings) {
// * initializing the base class
Flow.call(this, 0);
// * logging: every method uses the common logger object
this._log = log.child({ component: 'connection' });
// * stream management
this._initializeStreamManagement(firstStreamId);
// * lifecycle management
this._initializeLifecycleManagement();
// * flow control
this._initializeFlowControl();
// * settings management
this._initializeSettingsManagement(settings);
// * multiplexing
this._initializeMultiplexing();
}
Connection.prototype = Object.create(Flow.prototype, { constructor: { value: Connection } });
// Overview
// --------
// | ^ | ^
// v | v |
// +--------------+ +--------------+
// +---| stream1 |---| stream2 |---- .... ---+
// | | +----------+ | | +----------+ | |
// | | | stream1. | | | | stream2. | | |
// | +-| upstream |-+ +-| upstream |-+ |
// | +----------+ +----------+ |
// | | ^ | ^ |
// | v | v | |
// | +-----+-------------+-----+-------- .... |
// | ^ | | | |
// | | v | | |
// | +--------------+ | | |
// | | stream0 | | | |
// | | connection | | | |
// | | management | multiplexing |
// | +--------------+ flow control |
// | | ^ |
// | _read() | | _write() |
// | v | |
// | +------------+ +-----------+ |
// | |output queue| |input queue| |
// +----------------+------------+-+-----------+-----------------+
// | ^
// read() | | write()
// v |
// Stream management
// -----------------
var Stream = require('./stream').Stream;
// Initialization:
Connection.prototype._initializeStreamManagement = function _initializeStreamManagement(firstStreamId) {
// * streams are stored in two data structures:
// * `_streamIds` is an id -> stream map of the streams that are allowed to receive frames.
// * `_streamPriorities` is a priority -> [stream] map of stream that allowed to send frames.
this._streamIds = [];
this._streamPriorities = [];
// * The next outbound stream ID and the last inbound stream id
this._nextStreamId = firstStreamId;
this._lastIncomingStream = 0;
// * Calling `_writeControlFrame` when there's an incoming stream with 0 as stream ID
this._streamIds[0] = { upstream: { write: this._writeControlFrame.bind(this) } };
// * By default, the number of concurrent outbound streams is not limited. The `_streamLimit` can
// be set by the SETTINGS_MAX_CONCURRENT_STREAMS setting.
this._streamSlotsFree = Infinity;
this._streamLimit = Infinity;
this.on('RECEIVING_SETTINGS_MAX_CONCURRENT_STREAMS', this._updateStreamLimit);
};
// `_writeControlFrame` is called when there's an incoming frame in the `_control` stream. It
// broadcasts the message by creating an event on it.
Connection.prototype._writeControlFrame = function _writeControlFrame(frame) {
if ((frame.type === 'SETTINGS') || (frame.type === 'PING') ||
(frame.type === 'GOAWAY') || (frame.type === 'WINDOW_UPDATE') ||
(frame.type === 'ALTSVC') || (frame.type == 'ORIGIN')) {
this._log.debug({ frame: frame }, 'Receiving connection level frame');
this.emit(frame.type, frame);
} else {
this._log.error({ frame: frame }, 'Invalid connection level frame');
this.emit('error', 'PROTOCOL_ERROR');
}
};
// Methods to manage the stream slot pool:
Connection.prototype._updateStreamLimit = function _updateStreamLimit(newStreamLimit) {
var wakeup = (this._streamSlotsFree === 0) && (newStreamLimit > this._streamLimit);
this._streamSlotsFree += newStreamLimit - this._streamLimit;
this._streamLimit = newStreamLimit;
if (wakeup) {
this.emit('wakeup');
}
};
Connection.prototype._changeStreamCount = function _changeStreamCount(change) {
if (change) {
this._log.trace({ free: this._streamSlotsFree, change: change },
'Changing active stream count.');
var wakeup = (this._streamSlotsFree === 0) && (change < 0);
this._streamSlotsFree -= change;
if (wakeup) {
this.emit('wakeup');
}
}
};
// Creating a new *inbound or outbound* stream with the given `id` (which is undefined in case of
// an outbound stream) consists of three steps:
//
// 1. var stream = new Stream(this._log, this);
// 2. this._allocateId(stream, id);
// 2. this._allocatePriority(stream);
// Allocating an ID to a stream
Connection.prototype._allocateId = function _allocateId(stream, id) {
// * initiated stream without definite ID
if (id === undefined) {
id = this._nextStreamId;
this._nextStreamId += 2;
}
// * incoming stream with a legitim ID (larger than any previous and different parity than ours)
else if ((id > this._lastIncomingStream) && ((id - this._nextStreamId) % 2 !== 0)) {
this._lastIncomingStream = id;
}
// * incoming stream with invalid ID
else {
this._log.error({ stream_id: id, lastIncomingStream: this._lastIncomingStream },
'Invalid incoming stream ID.');
this.emit('error', 'PROTOCOL_ERROR');
return undefined;
}
assert(!(id in this._streamIds));
// * adding to `this._streamIds`
this._log.trace({ s: stream, stream_id: id }, 'Allocating ID for stream.');
this._streamIds[id] = stream;
stream.id = id;
this.emit('new_stream', stream, id);
// * forwarding connection errors from streams
stream.on('connectionError', this.emit.bind(this, 'error'));
return id;
};
// Allocating a priority to a stream, and managing priority changes
Connection.prototype._allocatePriority = function _allocatePriority(stream) {
this._log.trace({ s: stream }, 'Allocating priority for stream.');
this._insert(stream, stream._priority);
stream.on('priority', this._reprioritize.bind(this, stream));
stream.upstream.on('readable', this.emit.bind(this, 'wakeup'));
this.emit('wakeup');
};
Connection.prototype._insert = function _insert(stream, priority) {
if (priority in this._streamPriorities) {
this._streamPriorities[priority].push(stream);
} else {
this._streamPriorities[priority] = [stream];
}
};
Connection.prototype._reprioritize = function _reprioritize(stream, priority) {
var bucket = this._streamPriorities[stream._priority];
var index = bucket.indexOf(stream);
assert(index !== -1);
bucket.splice(index, 1);
if (bucket.length === 0) {
delete this._streamPriorities[stream._priority];
}
this._insert(stream, priority);
};
// Creating an *inbound* stream with the given ID. It is called when there's an incoming frame to
// a previously nonexistent stream.
Connection.prototype._createIncomingStream = function _createIncomingStream(id) {
this._log.debug({ stream_id: id }, 'New incoming stream.');
var stream = new Stream(this._log, this);
this._allocateId(stream, id);
this._allocatePriority(stream);
this.emit('stream', stream, id);
return stream;
};
// Creating an *outbound* stream
Connection.prototype.createStream = function createStream() {
this._log.trace('Creating new outbound stream.');
// * Receiving is enabled immediately, and an ID gets assigned to the stream
var stream = new Stream(this._log, this);
this._allocatePriority(stream);
return stream;
};
// Multiplexing
// ------------
Connection.prototype._initializeMultiplexing = function _initializeMultiplexing() {
this.on('window_update', this.emit.bind(this, 'wakeup'));
this._sendScheduled = false;
this._firstFrameReceived = false;
};
// The `_send` method is a virtual method of the [Flow class](flow.html) that has to be implemented
// by child classes. It reads frames from streams and pushes them to the output buffer.
Connection.prototype._send = function _send(immediate) {
// * Do not do anything if the connection is already closed
if (this._closed) {
return;
}
// * Collapsing multiple calls in a turn into a single deferred call
if (immediate) {
this._sendScheduled = false;
} else {
if (!this._sendScheduled) {
this._sendScheduled = true;
setImmediate(this._send.bind(this, true));
}
return;
}
this._log.trace('Starting forwarding frames from streams.');
// * Looping through priority `bucket`s in priority order.
priority_loop:
for (var priority in this._streamPriorities) {
var bucket = this._streamPriorities[priority];
var nextBucket = [];
// * Forwarding frames from buckets with round-robin scheduling.
// 1. pulling out frame
// 2. if there's no frame, skip this stream
// 3. if forwarding this frame would make `streamCount` greater than `streamLimit`, skip
// this stream
// 4. adding stream to the bucket of the next round
// 5. assigning an ID to the frame (allocating an ID to the stream if there isn't already)
// 6. if forwarding a PUSH_PROMISE, allocate ID to the promised stream
// 7. forwarding the frame, changing `streamCount` as appropriate
// 8. stepping to the next stream if there's still more frame needed in the output buffer
// 9. switching to the bucket of the next round
while (bucket.length > 0) {
for (var index = 0; index < bucket.length; index++) {
var stream = bucket[index];
var frame = stream.upstream.read((this._window > 0) ? this._window : -1);
if (!frame) {
continue;
} else if (frame.count_change > this._streamSlotsFree) {
stream.upstream.unshift(frame);
continue;
}
nextBucket.push(stream);
if (frame.stream === undefined) {
frame.stream = stream.id || this._allocateId(stream);
}
if (frame.type === 'PUSH_PROMISE') {
this._allocatePriority(frame.promised_stream);
frame.promised_stream = this._allocateId(frame.promised_stream);
}
this._log.trace({ s: stream, frame: frame }, 'Forwarding outgoing frame');
var moreNeeded = this.push(frame);
this._changeStreamCount(frame.count_change);
assert(moreNeeded !== null); // The frame shouldn't be unforwarded
if (moreNeeded === false) {
break priority_loop;
}
}
bucket = nextBucket;
nextBucket = [];
}
}
// * if we couldn't forward any frame, then sleep until window update, or some other wakeup event
if (moreNeeded === undefined) {
this.once('wakeup', this._send.bind(this));
}
this._log.trace({ moreNeeded: moreNeeded }, 'Stopping forwarding frames from streams.');
};
// The `_receive` method is another virtual method of the [Flow class](flow.html) that has to be
// implemented by child classes. It forwards the given frame to the appropriate stream:
Connection.prototype._receive = function _receive(frame, done) {
this._log.trace({ frame: frame }, 'Forwarding incoming frame');
// * first frame needs to be checked by the `_onFirstFrameReceived` method
if (!this._firstFrameReceived) {
this._firstFrameReceived = true;
this._onFirstFrameReceived(frame);
}
// Do some sanity checking here before we create a stream
if ((frame.type == 'SETTINGS' ||
frame.type == 'PING' ||
frame.type == 'GOAWAY') &&
frame.stream != 0) {
// Got connection-level frame on a stream - EEP!
this.close('PROTOCOL_ERROR');
return;
} else if ((frame.type == 'DATA' ||
frame.type == 'HEADERS' ||
frame.type == 'PRIORITY' ||
frame.type == 'RST_STREAM' ||
frame.type == 'PUSH_PROMISE' ||
frame.type == 'CONTINUATION') &&
frame.stream == 0) {
// Got stream-level frame on connection - EEP!
this.close('PROTOCOL_ERROR');
return;
}
// WINDOW_UPDATE can be on either stream or connection
// * gets the appropriate stream from the stream registry
var stream = this._streamIds[frame.stream];
// * or creates one if it's not in `this.streams`
if (!stream) {
stream = this._createIncomingStream(frame.stream);
}
// * in case of PUSH_PROMISE, replaces the promised stream id with a new incoming stream
if (frame.type === 'PUSH_PROMISE') {
frame.promised_stream = this._createIncomingStream(frame.promised_stream);
}
frame.count_change = this._changeStreamCount.bind(this);
// * and writes it to the `stream`'s `upstream`
stream.upstream.write(frame);
done();
};
// Settings management
// -------------------
var defaultSettings = {
};
// Settings management initialization:
Connection.prototype._initializeSettingsManagement = function _initializeSettingsManagement(settings) {
// * Setting up the callback queue for setting acknowledgements
this._settingsAckCallbacks = [];
// * Sending the initial settings.
this._log.debug({ settings: settings },
'Sending the first SETTINGS frame as part of the connection header.');
this.set(settings || defaultSettings);
// * Forwarding SETTINGS frames to the `_receiveSettings` method
this.on('SETTINGS', this._receiveSettings);
this.on('RECEIVING_SETTINGS_MAX_FRAME_SIZE', this._sanityCheckMaxFrameSize);
};
// * Checking that the first frame the other endpoint sends is SETTINGS
Connection.prototype._onFirstFrameReceived = function _onFirstFrameReceived(frame) {
if ((frame.stream === 0) && (frame.type === 'SETTINGS')) {
this._log.debug('Receiving the first SETTINGS frame as part of the connection header.');
} else {
this._log.fatal({ frame: frame }, 'Invalid connection header: first frame is not SETTINGS.');
this.emit('error', 'PROTOCOL_ERROR');
}
};
// Handling of incoming SETTINGS frames.
Connection.prototype._receiveSettings = function _receiveSettings(frame) {
// * If it's an ACK, call the appropriate callback
if (frame.flags.ACK) {
var callback = this._settingsAckCallbacks.shift();
if (callback) {
callback();
}
}
// * If it's a setting change request, then send an ACK and change the appropriate settings
else {
if (!this._closed) {
this.push({
type: 'SETTINGS',
flags: { ACK: true },
stream: 0,
settings: {}
});
}
for (var name in frame.settings) {
this.emit('RECEIVING_' + name, frame.settings[name]);
}
}
};
Connection.prototype._sanityCheckMaxFrameSize = function _sanityCheckMaxFrameSize(value) {
if ((value < 0x4000) || (value >= 0x01000000)) {
this._log.fatal('Received invalid value for max frame size: ' + value);
this.emit('error');
}
};
// Changing one or more settings value and sending out a SETTINGS frame
Connection.prototype.set = function set(settings, callback) {
// * Calling the callback and emitting event when the change is acknowledges
var self = this;
this._settingsAckCallbacks.push(function() {
for (var name in settings) {
self.emit('ACKNOWLEDGED_' + name, settings[name]);
}
if (callback) {
callback();
}
});
// * Sending out the SETTINGS frame
this.push({
type: 'SETTINGS',
flags: { ACK: false },
stream: 0,
settings: settings
});
for (var name in settings) {
this.emit('SENDING_' + name, settings[name]);
}
};
// Lifecycle management
// --------------------
// The main responsibilities of lifecycle management code:
//
// * keeping the connection alive by
// * sending PINGs when the connection is idle
// * answering PINGs
// * ending the connection
Connection.prototype._initializeLifecycleManagement = function _initializeLifecycleManagement() {
this._pings = {};
this.on('PING', this._receivePing);
this.on('GOAWAY', this._receiveGoaway);
this._closed = false;
};
// Generating a string of length 16 with random hexadecimal digits
Connection.prototype._generatePingId = function _generatePingId() {
do {
var id = '';
for (var i = 0; i < 16; i++) {
id += Math.floor(Math.random()*16).toString(16);
}
} while(id in this._pings);
return id;
};
// Sending a ping and calling `callback` when the answer arrives
Connection.prototype.ping = function ping(callback) {
var id = this._generatePingId();
var data = Buffer.from(id, 'hex');
this._pings[id] = callback;
this._log.debug({ data: data }, 'Sending PING.');
this.push({
type: 'PING',
flags: {
ACK: false
},
stream: 0,
data: data
});
};
// Answering pings
Connection.prototype._receivePing = function _receivePing(frame) {
if (frame.flags.ACK) {
var id = frame.data.toString('hex');
if (id in this._pings) {
this._log.debug({ data: frame.data }, 'Receiving answer for a PING.');
var callback = this._pings[id];
if (callback) {
callback();
}
delete this._pings[id];
} else {
this._log.warn({ data: frame.data }, 'Unsolicited PING answer.');
}
} else {
this._log.debug({ data: frame.data }, 'Answering PING.');
this.push({
type: 'PING',
flags: {
ACK: true
},
stream: 0,
data: frame.data
});
}
};
Connection.prototype.originFrame = function originFrame(originList) {
this._log.debug(originList, 'emitting origin frame');
this.push({
type: 'ORIGIN',
flags: {},
stream: 0,
originList : originList,
});
};
// Terminating the connection
Connection.prototype.close = function close(error) {
if (this._closed) {
this._log.warn('Trying to close an already closed connection');
return;
}
this._log.debug({ error: error }, 'Closing the connection');
this.push({
type: 'GOAWAY',
flags: {},
stream: 0,
last_stream: this._lastIncomingStream,
error: error || 'NO_ERROR'
});
this.push(null);
this._closed = true;
};
Connection.prototype._receiveGoaway = function _receiveGoaway(frame) {
this._log.debug({ error: frame.error }, 'Other end closed the connection');
this.push(null);
this._closed = true;
if (frame.error !== 'NO_ERROR') {
this.emit('peerError', frame.error);
}
};
// Flow control
// ------------
Connection.prototype._initializeFlowControl = function _initializeFlowControl() {
// Handling of initial window size of individual streams.
this._initialStreamWindowSize = INITIAL_STREAM_WINDOW_SIZE;
this.on('new_stream', function(stream) {
stream.upstream.setInitialWindow(this._initialStreamWindowSize);
});
this.on('RECEIVING_SETTINGS_INITIAL_WINDOW_SIZE', this._setInitialStreamWindowSize);
this._streamIds[0].upstream.setInitialWindow = function noop() {};
};
// The initial connection flow control window is 65535 bytes.
var INITIAL_STREAM_WINDOW_SIZE = 65535;
// A SETTINGS frame can alter the initial flow control window size for all current streams. When the
// value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the window size of all
// stream by calling the `setInitialStreamWindowSize` method. The window size has to be modified by
// the difference between the new value and the old value.
Connection.prototype._setInitialStreamWindowSize = function _setInitialStreamWindowSize(size) {
if ((this._initialStreamWindowSize === Infinity) && (size !== Infinity)) {
this._log.error('Trying to manipulate initial flow control window size after flow control was turned off.');
this.emit('error', 'FLOW_CONTROL_ERROR');
} else {
this._log.debug({ size: size }, 'Changing stream initial window size.');
this._initialStreamWindowSize = size;
this._streamIds.forEach(function(stream) {
stream.upstream.setInitialWindow(size);
});
}
};