2288 lines
61 KiB
JavaScript
2288 lines
61 KiB
JavaScript
|
// @ts-check
|
||
|
|
||
|
'use strict'
|
||
|
|
||
|
/* global WebAssembly */
|
||
|
|
||
|
const assert = require('assert')
|
||
|
const net = require('net')
|
||
|
const http = require('http')
|
||
|
const { pipeline } = require('stream')
|
||
|
const util = require('./core/util')
|
||
|
const timers = require('./timers')
|
||
|
const Request = require('./core/request')
|
||
|
const DispatcherBase = require('./dispatcher-base')
|
||
|
const {
|
||
|
RequestContentLengthMismatchError,
|
||
|
ResponseContentLengthMismatchError,
|
||
|
InvalidArgumentError,
|
||
|
RequestAbortedError,
|
||
|
HeadersTimeoutError,
|
||
|
HeadersOverflowError,
|
||
|
SocketError,
|
||
|
InformationalError,
|
||
|
BodyTimeoutError,
|
||
|
HTTPParserError,
|
||
|
ResponseExceededMaxSizeError,
|
||
|
ClientDestroyedError
|
||
|
} = require('./core/errors')
|
||
|
const buildConnector = require('./core/connect')
|
||
|
const {
|
||
|
kUrl,
|
||
|
kReset,
|
||
|
kServerName,
|
||
|
kClient,
|
||
|
kBusy,
|
||
|
kParser,
|
||
|
kConnect,
|
||
|
kBlocking,
|
||
|
kResuming,
|
||
|
kRunning,
|
||
|
kPending,
|
||
|
kSize,
|
||
|
kWriting,
|
||
|
kQueue,
|
||
|
kConnected,
|
||
|
kConnecting,
|
||
|
kNeedDrain,
|
||
|
kNoRef,
|
||
|
kKeepAliveDefaultTimeout,
|
||
|
kHostHeader,
|
||
|
kPendingIdx,
|
||
|
kRunningIdx,
|
||
|
kError,
|
||
|
kPipelining,
|
||
|
kSocket,
|
||
|
kKeepAliveTimeoutValue,
|
||
|
kMaxHeadersSize,
|
||
|
kKeepAliveMaxTimeout,
|
||
|
kKeepAliveTimeoutThreshold,
|
||
|
kHeadersTimeout,
|
||
|
kBodyTimeout,
|
||
|
kStrictContentLength,
|
||
|
kConnector,
|
||
|
kMaxRedirections,
|
||
|
kMaxRequests,
|
||
|
kCounter,
|
||
|
kClose,
|
||
|
kDestroy,
|
||
|
kDispatch,
|
||
|
kInterceptors,
|
||
|
kLocalAddress,
|
||
|
kMaxResponseSize,
|
||
|
kHTTPConnVersion,
|
||
|
// HTTP2
|
||
|
kHost,
|
||
|
kHTTP2Session,
|
||
|
kHTTP2SessionState,
|
||
|
kHTTP2BuildRequest,
|
||
|
kHTTP2CopyHeaders,
|
||
|
kHTTP1BuildRequest
|
||
|
} = require('./core/symbols')
|
||
|
|
||
|
/** @type {import('http2')} */
|
||
|
let http2
|
||
|
try {
|
||
|
http2 = require('http2')
|
||
|
} catch {
|
||
|
// @ts-ignore
|
||
|
http2 = { constants: {} }
|
||
|
}
|
||
|
|
||
|
const {
|
||
|
constants: {
|
||
|
HTTP2_HEADER_AUTHORITY,
|
||
|
HTTP2_HEADER_METHOD,
|
||
|
HTTP2_HEADER_PATH,
|
||
|
HTTP2_HEADER_SCHEME,
|
||
|
HTTP2_HEADER_CONTENT_LENGTH,
|
||
|
HTTP2_HEADER_EXPECT,
|
||
|
HTTP2_HEADER_STATUS
|
||
|
}
|
||
|
} = http2
|
||
|
|
||
|
// Experimental
|
||
|
let h2ExperimentalWarned = false
|
||
|
|
||
|
const FastBuffer = Buffer[Symbol.species]
|
||
|
|
||
|
const kClosedResolve = Symbol('kClosedResolve')
|
||
|
|
||
|
const channels = {}
|
||
|
|
||
|
try {
|
||
|
const diagnosticsChannel = require('diagnostics_channel')
|
||
|
channels.sendHeaders = diagnosticsChannel.channel('undici:client:sendHeaders')
|
||
|
channels.beforeConnect = diagnosticsChannel.channel('undici:client:beforeConnect')
|
||
|
channels.connectError = diagnosticsChannel.channel('undici:client:connectError')
|
||
|
channels.connected = diagnosticsChannel.channel('undici:client:connected')
|
||
|
} catch {
|
||
|
channels.sendHeaders = { hasSubscribers: false }
|
||
|
channels.beforeConnect = { hasSubscribers: false }
|
||
|
channels.connectError = { hasSubscribers: false }
|
||
|
channels.connected = { hasSubscribers: false }
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* @type {import('../types/client').default}
|
||
|
*/
|
||
|
class Client extends DispatcherBase {
|
||
|
/**
|
||
|
*
|
||
|
* @param {string|URL} url
|
||
|
* @param {import('../types/client').Client.Options} options
|
||
|
*/
|
||
|
constructor (url, {
|
||
|
interceptors,
|
||
|
maxHeaderSize,
|
||
|
headersTimeout,
|
||
|
socketTimeout,
|
||
|
requestTimeout,
|
||
|
connectTimeout,
|
||
|
bodyTimeout,
|
||
|
idleTimeout,
|
||
|
keepAlive,
|
||
|
keepAliveTimeout,
|
||
|
maxKeepAliveTimeout,
|
||
|
keepAliveMaxTimeout,
|
||
|
keepAliveTimeoutThreshold,
|
||
|
socketPath,
|
||
|
pipelining,
|
||
|
tls,
|
||
|
strictContentLength,
|
||
|
maxCachedSessions,
|
||
|
maxRedirections,
|
||
|
connect,
|
||
|
maxRequestsPerClient,
|
||
|
localAddress,
|
||
|
maxResponseSize,
|
||
|
autoSelectFamily,
|
||
|
autoSelectFamilyAttemptTimeout,
|
||
|
// h2
|
||
|
allowH2,
|
||
|
maxConcurrentStreams
|
||
|
} = {}) {
|
||
|
super()
|
||
|
|
||
|
if (keepAlive !== undefined) {
|
||
|
throw new InvalidArgumentError('unsupported keepAlive, use pipelining=0 instead')
|
||
|
}
|
||
|
|
||
|
if (socketTimeout !== undefined) {
|
||
|
throw new InvalidArgumentError('unsupported socketTimeout, use headersTimeout & bodyTimeout instead')
|
||
|
}
|
||
|
|
||
|
if (requestTimeout !== undefined) {
|
||
|
throw new InvalidArgumentError('unsupported requestTimeout, use headersTimeout & bodyTimeout instead')
|
||
|
}
|
||
|
|
||
|
if (idleTimeout !== undefined) {
|
||
|
throw new InvalidArgumentError('unsupported idleTimeout, use keepAliveTimeout instead')
|
||
|
}
|
||
|
|
||
|
if (maxKeepAliveTimeout !== undefined) {
|
||
|
throw new InvalidArgumentError('unsupported maxKeepAliveTimeout, use keepAliveMaxTimeout instead')
|
||
|
}
|
||
|
|
||
|
if (maxHeaderSize != null && !Number.isFinite(maxHeaderSize)) {
|
||
|
throw new InvalidArgumentError('invalid maxHeaderSize')
|
||
|
}
|
||
|
|
||
|
if (socketPath != null && typeof socketPath !== 'string') {
|
||
|
throw new InvalidArgumentError('invalid socketPath')
|
||
|
}
|
||
|
|
||
|
if (connectTimeout != null && (!Number.isFinite(connectTimeout) || connectTimeout < 0)) {
|
||
|
throw new InvalidArgumentError('invalid connectTimeout')
|
||
|
}
|
||
|
|
||
|
if (keepAliveTimeout != null && (!Number.isFinite(keepAliveTimeout) || keepAliveTimeout <= 0)) {
|
||
|
throw new InvalidArgumentError('invalid keepAliveTimeout')
|
||
|
}
|
||
|
|
||
|
if (keepAliveMaxTimeout != null && (!Number.isFinite(keepAliveMaxTimeout) || keepAliveMaxTimeout <= 0)) {
|
||
|
throw new InvalidArgumentError('invalid keepAliveMaxTimeout')
|
||
|
}
|
||
|
|
||
|
if (keepAliveTimeoutThreshold != null && !Number.isFinite(keepAliveTimeoutThreshold)) {
|
||
|
throw new InvalidArgumentError('invalid keepAliveTimeoutThreshold')
|
||
|
}
|
||
|
|
||
|
if (headersTimeout != null && (!Number.isInteger(headersTimeout) || headersTimeout < 0)) {
|
||
|
throw new InvalidArgumentError('headersTimeout must be a positive integer or zero')
|
||
|
}
|
||
|
|
||
|
if (bodyTimeout != null && (!Number.isInteger(bodyTimeout) || bodyTimeout < 0)) {
|
||
|
throw new InvalidArgumentError('bodyTimeout must be a positive integer or zero')
|
||
|
}
|
||
|
|
||
|
if (connect != null && typeof connect !== 'function' && typeof connect !== 'object') {
|
||
|
throw new InvalidArgumentError('connect must be a function or an object')
|
||
|
}
|
||
|
|
||
|
if (maxRedirections != null && (!Number.isInteger(maxRedirections) || maxRedirections < 0)) {
|
||
|
throw new InvalidArgumentError('maxRedirections must be a positive number')
|
||
|
}
|
||
|
|
||
|
if (maxRequestsPerClient != null && (!Number.isInteger(maxRequestsPerClient) || maxRequestsPerClient < 0)) {
|
||
|
throw new InvalidArgumentError('maxRequestsPerClient must be a positive number')
|
||
|
}
|
||
|
|
||
|
if (localAddress != null && (typeof localAddress !== 'string' || net.isIP(localAddress) === 0)) {
|
||
|
throw new InvalidArgumentError('localAddress must be valid string IP address')
|
||
|
}
|
||
|
|
||
|
if (maxResponseSize != null && (!Number.isInteger(maxResponseSize) || maxResponseSize < -1)) {
|
||
|
throw new InvalidArgumentError('maxResponseSize must be a positive number')
|
||
|
}
|
||
|
|
||
|
if (
|
||
|
autoSelectFamilyAttemptTimeout != null &&
|
||
|
(!Number.isInteger(autoSelectFamilyAttemptTimeout) || autoSelectFamilyAttemptTimeout < -1)
|
||
|
) {
|
||
|
throw new InvalidArgumentError('autoSelectFamilyAttemptTimeout must be a positive number')
|
||
|
}
|
||
|
|
||
|
// h2
|
||
|
if (allowH2 != null && typeof allowH2 !== 'boolean') {
|
||
|
throw new InvalidArgumentError('allowH2 must be a valid boolean value')
|
||
|
}
|
||
|
|
||
|
if (maxConcurrentStreams != null && (typeof maxConcurrentStreams !== 'number' || maxConcurrentStreams < 1)) {
|
||
|
throw new InvalidArgumentError('maxConcurrentStreams must be a possitive integer, greater than 0')
|
||
|
}
|
||
|
|
||
|
if (typeof connect !== 'function') {
|
||
|
connect = buildConnector({
|
||
|
...tls,
|
||
|
maxCachedSessions,
|
||
|
allowH2,
|
||
|
socketPath,
|
||
|
timeout: connectTimeout,
|
||
|
...(util.nodeHasAutoSelectFamily && autoSelectFamily ? { autoSelectFamily, autoSelectFamilyAttemptTimeout } : undefined),
|
||
|
...connect
|
||
|
})
|
||
|
}
|
||
|
|
||
|
this[kInterceptors] = interceptors && interceptors.Client && Array.isArray(interceptors.Client)
|
||
|
? interceptors.Client
|
||
|
: [createRedirectInterceptor({ maxRedirections })]
|
||
|
this[kUrl] = util.parseOrigin(url)
|
||
|
this[kConnector] = connect
|
||
|
this[kSocket] = null
|
||
|
this[kPipelining] = pipelining != null ? pipelining : 1
|
||
|
this[kMaxHeadersSize] = maxHeaderSize || http.maxHeaderSize
|
||
|
this[kKeepAliveDefaultTimeout] = keepAliveTimeout == null ? 4e3 : keepAliveTimeout
|
||
|
this[kKeepAliveMaxTimeout] = keepAliveMaxTimeout == null ? 600e3 : keepAliveMaxTimeout
|
||
|
this[kKeepAliveTimeoutThreshold] = keepAliveTimeoutThreshold == null ? 1e3 : keepAliveTimeoutThreshold
|
||
|
this[kKeepAliveTimeoutValue] = this[kKeepAliveDefaultTimeout]
|
||
|
this[kServerName] = null
|
||
|
this[kLocalAddress] = localAddress != null ? localAddress : null
|
||
|
this[kResuming] = 0 // 0, idle, 1, scheduled, 2 resuming
|
||
|
this[kNeedDrain] = 0 // 0, idle, 1, scheduled, 2 resuming
|
||
|
this[kHostHeader] = `host: ${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}\r\n`
|
||
|
this[kBodyTimeout] = bodyTimeout != null ? bodyTimeout : 300e3
|
||
|
this[kHeadersTimeout] = headersTimeout != null ? headersTimeout : 300e3
|
||
|
this[kStrictContentLength] = strictContentLength == null ? true : strictContentLength
|
||
|
this[kMaxRedirections] = maxRedirections
|
||
|
this[kMaxRequests] = maxRequestsPerClient
|
||
|
this[kClosedResolve] = null
|
||
|
this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1
|
||
|
this[kHTTPConnVersion] = 'h1'
|
||
|
|
||
|
// HTTP/2
|
||
|
this[kHTTP2Session] = null
|
||
|
this[kHTTP2SessionState] = !allowH2
|
||
|
? null
|
||
|
: {
|
||
|
// streams: null, // Fixed queue of streams - For future support of `push`
|
||
|
openStreams: 0, // Keep track of them to decide wether or not unref the session
|
||
|
maxConcurrentStreams: maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server
|
||
|
}
|
||
|
this[kHost] = `${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}`
|
||
|
|
||
|
// kQueue is built up of 3 sections separated by
|
||
|
// the kRunningIdx and kPendingIdx indices.
|
||
|
// | complete | running | pending |
|
||
|
// ^ kRunningIdx ^ kPendingIdx ^ kQueue.length
|
||
|
// kRunningIdx points to the first running element.
|
||
|
// kPendingIdx points to the first pending element.
|
||
|
// This implements a fast queue with an amortized
|
||
|
// time of O(1).
|
||
|
|
||
|
this[kQueue] = []
|
||
|
this[kRunningIdx] = 0
|
||
|
this[kPendingIdx] = 0
|
||
|
}
|
||
|
|
||
|
get pipelining () {
|
||
|
return this[kPipelining]
|
||
|
}
|
||
|
|
||
|
set pipelining (value) {
|
||
|
this[kPipelining] = value
|
||
|
resume(this, true)
|
||
|
}
|
||
|
|
||
|
get [kPending] () {
|
||
|
return this[kQueue].length - this[kPendingIdx]
|
||
|
}
|
||
|
|
||
|
get [kRunning] () {
|
||
|
return this[kPendingIdx] - this[kRunningIdx]
|
||
|
}
|
||
|
|
||
|
get [kSize] () {
|
||
|
return this[kQueue].length - this[kRunningIdx]
|
||
|
}
|
||
|
|
||
|
get [kConnected] () {
|
||
|
return !!this[kSocket] && !this[kConnecting] && !this[kSocket].destroyed
|
||
|
}
|
||
|
|
||
|
get [kBusy] () {
|
||
|
const socket = this[kSocket]
|
||
|
return (
|
||
|
(socket && (socket[kReset] || socket[kWriting] || socket[kBlocking])) ||
|
||
|
(this[kSize] >= (this[kPipelining] || 1)) ||
|
||
|
this[kPending] > 0
|
||
|
)
|
||
|
}
|
||
|
|
||
|
/* istanbul ignore: only used for test */
|
||
|
[kConnect] (cb) {
|
||
|
connect(this)
|
||
|
this.once('connect', cb)
|
||
|
}
|
||
|
|
||
|
[kDispatch] (opts, handler) {
|
||
|
const origin = opts.origin || this[kUrl].origin
|
||
|
|
||
|
const request = this[kHTTPConnVersion] === 'h2'
|
||
|
? Request[kHTTP2BuildRequest](origin, opts, handler)
|
||
|
: Request[kHTTP1BuildRequest](origin, opts, handler)
|
||
|
|
||
|
this[kQueue].push(request)
|
||
|
if (this[kResuming]) {
|
||
|
// Do nothing.
|
||
|
} else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) {
|
||
|
// Wait a tick in case stream/iterator is ended in the same tick.
|
||
|
this[kResuming] = 1
|
||
|
process.nextTick(resume, this)
|
||
|
} else {
|
||
|
resume(this, true)
|
||
|
}
|
||
|
|
||
|
if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) {
|
||
|
this[kNeedDrain] = 2
|
||
|
}
|
||
|
|
||
|
return this[kNeedDrain] < 2
|
||
|
}
|
||
|
|
||
|
async [kClose] () {
|
||
|
// TODO: for H2 we need to gracefully flush the remaining enqueued
|
||
|
// request and close each stream.
|
||
|
return new Promise((resolve) => {
|
||
|
if (!this[kSize]) {
|
||
|
resolve(null)
|
||
|
} else {
|
||
|
this[kClosedResolve] = resolve
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
|
||
|
async [kDestroy] (err) {
|
||
|
return new Promise((resolve) => {
|
||
|
const requests = this[kQueue].splice(this[kPendingIdx])
|
||
|
for (let i = 0; i < requests.length; i++) {
|
||
|
const request = requests[i]
|
||
|
errorRequest(this, request, err)
|
||
|
}
|
||
|
|
||
|
const callback = () => {
|
||
|
if (this[kClosedResolve]) {
|
||
|
// TODO (fix): Should we error here with ClientDestroyedError?
|
||
|
this[kClosedResolve]()
|
||
|
this[kClosedResolve] = null
|
||
|
}
|
||
|
resolve()
|
||
|
}
|
||
|
|
||
|
if (this[kHTTP2Session] != null) {
|
||
|
util.destroy(this[kHTTP2Session], err)
|
||
|
this[kHTTP2Session] = null
|
||
|
this[kHTTP2SessionState] = null
|
||
|
}
|
||
|
|
||
|
if (!this[kSocket]) {
|
||
|
queueMicrotask(callback)
|
||
|
} else {
|
||
|
util.destroy(this[kSocket].on('close', callback), err)
|
||
|
}
|
||
|
|
||
|
resume(this)
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function onHttp2SessionError (err) {
|
||
|
assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
|
||
|
|
||
|
this[kSocket][kError] = err
|
||
|
|
||
|
onError(this[kClient], err)
|
||
|
}
|
||
|
|
||
|
function onHttp2FrameError (type, code, id) {
|
||
|
const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)
|
||
|
|
||
|
if (id === 0) {
|
||
|
this[kSocket][kError] = err
|
||
|
onError(this[kClient], err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function onHttp2SessionEnd () {
|
||
|
util.destroy(this, new SocketError('other side closed'))
|
||
|
util.destroy(this[kSocket], new SocketError('other side closed'))
|
||
|
}
|
||
|
|
||
|
function onHTTP2GoAway (code) {
|
||
|
const client = this[kClient]
|
||
|
const err = new InformationalError(`HTTP/2: "GOAWAY" frame received with code ${code}`)
|
||
|
client[kSocket] = null
|
||
|
client[kHTTP2Session] = null
|
||
|
|
||
|
if (client.destroyed) {
|
||
|
assert(this[kPending] === 0)
|
||
|
|
||
|
// Fail entire queue.
|
||
|
const requests = client[kQueue].splice(client[kRunningIdx])
|
||
|
for (let i = 0; i < requests.length; i++) {
|
||
|
const request = requests[i]
|
||
|
errorRequest(this, request, err)
|
||
|
}
|
||
|
} else if (client[kRunning] > 0) {
|
||
|
// Fail head of pipeline.
|
||
|
const request = client[kQueue][client[kRunningIdx]]
|
||
|
client[kQueue][client[kRunningIdx]++] = null
|
||
|
|
||
|
errorRequest(client, request, err)
|
||
|
}
|
||
|
|
||
|
client[kPendingIdx] = client[kRunningIdx]
|
||
|
|
||
|
assert(client[kRunning] === 0)
|
||
|
|
||
|
client.emit('disconnect',
|
||
|
client[kUrl],
|
||
|
[client],
|
||
|
err
|
||
|
)
|
||
|
|
||
|
resume(client)
|
||
|
}
|
||
|
|
||
|
const constants = require('./llhttp/constants')
|
||
|
const createRedirectInterceptor = require('./interceptor/redirectInterceptor')
|
||
|
const EMPTY_BUF = Buffer.alloc(0)
|
||
|
|
||
|
async function lazyllhttp () {
|
||
|
const llhttpWasmData = process.env.JEST_WORKER_ID ? require('./llhttp/llhttp-wasm.js') : undefined
|
||
|
|
||
|
let mod
|
||
|
try {
|
||
|
mod = await WebAssembly.compile(Buffer.from(require('./llhttp/llhttp_simd-wasm.js'), 'base64'))
|
||
|
} catch (e) {
|
||
|
/* istanbul ignore next */
|
||
|
|
||
|
// We could check if the error was caused by the simd option not
|
||
|
// being enabled, but the occurring of this other error
|
||
|
// * https://github.com/emscripten-core/emscripten/issues/11495
|
||
|
// got me to remove that check to avoid breaking Node 12.
|
||
|
mod = await WebAssembly.compile(Buffer.from(llhttpWasmData || require('./llhttp/llhttp-wasm.js'), 'base64'))
|
||
|
}
|
||
|
|
||
|
return await WebAssembly.instantiate(mod, {
|
||
|
env: {
|
||
|
/* eslint-disable camelcase */
|
||
|
|
||
|
wasm_on_url: (p, at, len) => {
|
||
|
/* istanbul ignore next */
|
||
|
return 0
|
||
|
},
|
||
|
wasm_on_status: (p, at, len) => {
|
||
|
assert.strictEqual(currentParser.ptr, p)
|
||
|
const start = at - currentBufferPtr + currentBufferRef.byteOffset
|
||
|
return currentParser.onStatus(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
|
||
|
},
|
||
|
wasm_on_message_begin: (p) => {
|
||
|
assert.strictEqual(currentParser.ptr, p)
|
||
|
return currentParser.onMessageBegin() || 0
|
||
|
},
|
||
|
wasm_on_header_field: (p, at, len) => {
|
||
|
assert.strictEqual(currentParser.ptr, p)
|
||
|
const start = at - currentBufferPtr + currentBufferRef.byteOffset
|
||
|
return currentParser.onHeaderField(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
|
||
|
},
|
||
|
wasm_on_header_value: (p, at, len) => {
|
||
|
assert.strictEqual(currentParser.ptr, p)
|
||
|
const start = at - currentBufferPtr + currentBufferRef.byteOffset
|
||
|
return currentParser.onHeaderValue(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
|
||
|
},
|
||
|
wasm_on_headers_complete: (p, statusCode, upgrade, shouldKeepAlive) => {
|
||
|
assert.strictEqual(currentParser.ptr, p)
|
||
|
return currentParser.onHeadersComplete(statusCode, Boolean(upgrade), Boolean(shouldKeepAlive)) || 0
|
||
|
},
|
||
|
wasm_on_body: (p, at, len) => {
|
||
|
assert.strictEqual(currentParser.ptr, p)
|
||
|
const start = at - currentBufferPtr + currentBufferRef.byteOffset
|
||
|
return currentParser.onBody(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
|
||
|
},
|
||
|
wasm_on_message_complete: (p) => {
|
||
|
assert.strictEqual(currentParser.ptr, p)
|
||
|
return currentParser.onMessageComplete() || 0
|
||
|
}
|
||
|
|
||
|
/* eslint-enable camelcase */
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
|
||
|
let llhttpInstance = null
|
||
|
let llhttpPromise = lazyllhttp()
|
||
|
llhttpPromise.catch()
|
||
|
|
||
|
let currentParser = null
|
||
|
let currentBufferRef = null
|
||
|
let currentBufferSize = 0
|
||
|
let currentBufferPtr = null
|
||
|
|
||
|
const TIMEOUT_HEADERS = 1
|
||
|
const TIMEOUT_BODY = 2
|
||
|
const TIMEOUT_IDLE = 3
|
||
|
|
||
|
class Parser {
|
||
|
constructor (client, socket, { exports }) {
|
||
|
assert(Number.isFinite(client[kMaxHeadersSize]) && client[kMaxHeadersSize] > 0)
|
||
|
|
||
|
this.llhttp = exports
|
||
|
this.ptr = this.llhttp.llhttp_alloc(constants.TYPE.RESPONSE)
|
||
|
this.client = client
|
||
|
this.socket = socket
|
||
|
this.timeout = null
|
||
|
this.timeoutValue = null
|
||
|
this.timeoutType = null
|
||
|
this.statusCode = null
|
||
|
this.statusText = ''
|
||
|
this.upgrade = false
|
||
|
this.headers = []
|
||
|
this.headersSize = 0
|
||
|
this.headersMaxSize = client[kMaxHeadersSize]
|
||
|
this.shouldKeepAlive = false
|
||
|
this.paused = false
|
||
|
this.resume = this.resume.bind(this)
|
||
|
|
||
|
this.bytesRead = 0
|
||
|
|
||
|
this.keepAlive = ''
|
||
|
this.contentLength = ''
|
||
|
this.connection = ''
|
||
|
this.maxResponseSize = client[kMaxResponseSize]
|
||
|
}
|
||
|
|
||
|
setTimeout (value, type) {
|
||
|
this.timeoutType = type
|
||
|
if (value !== this.timeoutValue) {
|
||
|
timers.clearTimeout(this.timeout)
|
||
|
if (value) {
|
||
|
this.timeout = timers.setTimeout(onParserTimeout, value, this)
|
||
|
// istanbul ignore else: only for jest
|
||
|
if (this.timeout.unref) {
|
||
|
this.timeout.unref()
|
||
|
}
|
||
|
} else {
|
||
|
this.timeout = null
|
||
|
}
|
||
|
this.timeoutValue = value
|
||
|
} else if (this.timeout) {
|
||
|
// istanbul ignore else: only for jest
|
||
|
if (this.timeout.refresh) {
|
||
|
this.timeout.refresh()
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
resume () {
|
||
|
if (this.socket.destroyed || !this.paused) {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
assert(this.ptr != null)
|
||
|
assert(currentParser == null)
|
||
|
|
||
|
this.llhttp.llhttp_resume(this.ptr)
|
||
|
|
||
|
assert(this.timeoutType === TIMEOUT_BODY)
|
||
|
if (this.timeout) {
|
||
|
// istanbul ignore else: only for jest
|
||
|
if (this.timeout.refresh) {
|
||
|
this.timeout.refresh()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
this.paused = false
|
||
|
this.execute(this.socket.read() || EMPTY_BUF) // Flush parser.
|
||
|
this.readMore()
|
||
|
}
|
||
|
|
||
|
readMore () {
|
||
|
while (!this.paused && this.ptr) {
|
||
|
const chunk = this.socket.read()
|
||
|
if (chunk === null) {
|
||
|
break
|
||
|
}
|
||
|
this.execute(chunk)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
execute (data) {
|
||
|
assert(this.ptr != null)
|
||
|
assert(currentParser == null)
|
||
|
assert(!this.paused)
|
||
|
|
||
|
const { socket, llhttp } = this
|
||
|
|
||
|
if (data.length > currentBufferSize) {
|
||
|
if (currentBufferPtr) {
|
||
|
llhttp.free(currentBufferPtr)
|
||
|
}
|
||
|
currentBufferSize = Math.ceil(data.length / 4096) * 4096
|
||
|
currentBufferPtr = llhttp.malloc(currentBufferSize)
|
||
|
}
|
||
|
|
||
|
new Uint8Array(llhttp.memory.buffer, currentBufferPtr, currentBufferSize).set(data)
|
||
|
|
||
|
// Call `execute` on the wasm parser.
|
||
|
// We pass the `llhttp_parser` pointer address, the pointer address of buffer view data,
|
||
|
// and finally the length of bytes to parse.
|
||
|
// The return value is an error code or `constants.ERROR.OK`.
|
||
|
try {
|
||
|
let ret
|
||
|
|
||
|
try {
|
||
|
currentBufferRef = data
|
||
|
currentParser = this
|
||
|
ret = llhttp.llhttp_execute(this.ptr, currentBufferPtr, data.length)
|
||
|
/* eslint-disable-next-line no-useless-catch */
|
||
|
} catch (err) {
|
||
|
/* istanbul ignore next: difficult to make a test case for */
|
||
|
throw err
|
||
|
} finally {
|
||
|
currentParser = null
|
||
|
currentBufferRef = null
|
||
|
}
|
||
|
|
||
|
const offset = llhttp.llhttp_get_error_pos(this.ptr) - currentBufferPtr
|
||
|
|
||
|
if (ret === constants.ERROR.PAUSED_UPGRADE) {
|
||
|
this.onUpgrade(data.slice(offset))
|
||
|
} else if (ret === constants.ERROR.PAUSED) {
|
||
|
this.paused = true
|
||
|
socket.unshift(data.slice(offset))
|
||
|
} else if (ret !== constants.ERROR.OK) {
|
||
|
const ptr = llhttp.llhttp_get_error_reason(this.ptr)
|
||
|
let message = ''
|
||
|
/* istanbul ignore else: difficult to make a test case for */
|
||
|
if (ptr) {
|
||
|
const len = new Uint8Array(llhttp.memory.buffer, ptr).indexOf(0)
|
||
|
message =
|
||
|
'Response does not match the HTTP/1.1 protocol (' +
|
||
|
Buffer.from(llhttp.memory.buffer, ptr, len).toString() +
|
||
|
')'
|
||
|
}
|
||
|
throw new HTTPParserError(message, constants.ERROR[ret], data.slice(offset))
|
||
|
}
|
||
|
} catch (err) {
|
||
|
util.destroy(socket, err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
destroy () {
|
||
|
assert(this.ptr != null)
|
||
|
assert(currentParser == null)
|
||
|
|
||
|
this.llhttp.llhttp_free(this.ptr)
|
||
|
this.ptr = null
|
||
|
|
||
|
timers.clearTimeout(this.timeout)
|
||
|
this.timeout = null
|
||
|
this.timeoutValue = null
|
||
|
this.timeoutType = null
|
||
|
|
||
|
this.paused = false
|
||
|
}
|
||
|
|
||
|
onStatus (buf) {
|
||
|
this.statusText = buf.toString()
|
||
|
}
|
||
|
|
||
|
onMessageBegin () {
|
||
|
const { socket, client } = this
|
||
|
|
||
|
/* istanbul ignore next: difficult to make a test case for */
|
||
|
if (socket.destroyed) {
|
||
|
return -1
|
||
|
}
|
||
|
|
||
|
const request = client[kQueue][client[kRunningIdx]]
|
||
|
if (!request) {
|
||
|
return -1
|
||
|
}
|
||
|
}
|
||
|
|
||
|
onHeaderField (buf) {
|
||
|
const len = this.headers.length
|
||
|
|
||
|
if ((len & 1) === 0) {
|
||
|
this.headers.push(buf)
|
||
|
} else {
|
||
|
this.headers[len - 1] = Buffer.concat([this.headers[len - 1], buf])
|
||
|
}
|
||
|
|
||
|
this.trackHeader(buf.length)
|
||
|
}
|
||
|
|
||
|
onHeaderValue (buf) {
|
||
|
let len = this.headers.length
|
||
|
|
||
|
if ((len & 1) === 1) {
|
||
|
this.headers.push(buf)
|
||
|
len += 1
|
||
|
} else {
|
||
|
this.headers[len - 1] = Buffer.concat([this.headers[len - 1], buf])
|
||
|
}
|
||
|
|
||
|
const key = this.headers[len - 2]
|
||
|
if (key.length === 10 && key.toString().toLowerCase() === 'keep-alive') {
|
||
|
this.keepAlive += buf.toString()
|
||
|
} else if (key.length === 10 && key.toString().toLowerCase() === 'connection') {
|
||
|
this.connection += buf.toString()
|
||
|
} else if (key.length === 14 && key.toString().toLowerCase() === 'content-length') {
|
||
|
this.contentLength += buf.toString()
|
||
|
}
|
||
|
|
||
|
this.trackHeader(buf.length)
|
||
|
}
|
||
|
|
||
|
trackHeader (len) {
|
||
|
this.headersSize += len
|
||
|
if (this.headersSize >= this.headersMaxSize) {
|
||
|
util.destroy(this.socket, new HeadersOverflowError())
|
||
|
}
|
||
|
}
|
||
|
|
||
|
onUpgrade (head) {
|
||
|
const { upgrade, client, socket, headers, statusCode } = this
|
||
|
|
||
|
assert(upgrade)
|
||
|
|
||
|
const request = client[kQueue][client[kRunningIdx]]
|
||
|
assert(request)
|
||
|
|
||
|
assert(!socket.destroyed)
|
||
|
assert(socket === client[kSocket])
|
||
|
assert(!this.paused)
|
||
|
assert(request.upgrade || request.method === 'CONNECT')
|
||
|
|
||
|
this.statusCode = null
|
||
|
this.statusText = ''
|
||
|
this.shouldKeepAlive = null
|
||
|
|
||
|
assert(this.headers.length % 2 === 0)
|
||
|
this.headers = []
|
||
|
this.headersSize = 0
|
||
|
|
||
|
socket.unshift(head)
|
||
|
|
||
|
socket[kParser].destroy()
|
||
|
socket[kParser] = null
|
||
|
|
||
|
socket[kClient] = null
|
||
|
socket[kError] = null
|
||
|
socket
|
||
|
.removeListener('error', onSocketError)
|
||
|
.removeListener('readable', onSocketReadable)
|
||
|
.removeListener('end', onSocketEnd)
|
||
|
.removeListener('close', onSocketClose)
|
||
|
|
||
|
client[kSocket] = null
|
||
|
client[kQueue][client[kRunningIdx]++] = null
|
||
|
client.emit('disconnect', client[kUrl], [client], new InformationalError('upgrade'))
|
||
|
|
||
|
try {
|
||
|
request.onUpgrade(statusCode, headers, socket)
|
||
|
} catch (err) {
|
||
|
util.destroy(socket, err)
|
||
|
}
|
||
|
|
||
|
resume(client)
|
||
|
}
|
||
|
|
||
|
onHeadersComplete (statusCode, upgrade, shouldKeepAlive) {
|
||
|
const { client, socket, headers, statusText } = this
|
||
|
|
||
|
/* istanbul ignore next: difficult to make a test case for */
|
||
|
if (socket.destroyed) {
|
||
|
return -1
|
||
|
}
|
||
|
|
||
|
const request = client[kQueue][client[kRunningIdx]]
|
||
|
|
||
|
/* istanbul ignore next: difficult to make a test case for */
|
||
|
if (!request) {
|
||
|
return -1
|
||
|
}
|
||
|
|
||
|
assert(!this.upgrade)
|
||
|
assert(this.statusCode < 200)
|
||
|
|
||
|
if (statusCode === 100) {
|
||
|
util.destroy(socket, new SocketError('bad response', util.getSocketInfo(socket)))
|
||
|
return -1
|
||
|
}
|
||
|
|
||
|
/* this can only happen if server is misbehaving */
|
||
|
if (upgrade && !request.upgrade) {
|
||
|
util.destroy(socket, new SocketError('bad upgrade', util.getSocketInfo(socket)))
|
||
|
return -1
|
||
|
}
|
||
|
|
||
|
assert.strictEqual(this.timeoutType, TIMEOUT_HEADERS)
|
||
|
|
||
|
this.statusCode = statusCode
|
||
|
this.shouldKeepAlive = (
|
||
|
shouldKeepAlive ||
|
||
|
// Override llhttp value which does not allow keepAlive for HEAD.
|
||
|
(request.method === 'HEAD' && !socket[kReset] && this.connection.toLowerCase() === 'keep-alive')
|
||
|
)
|
||
|
|
||
|
if (this.statusCode >= 200) {
|
||
|
const bodyTimeout = request.bodyTimeout != null
|
||
|
? request.bodyTimeout
|
||
|
: client[kBodyTimeout]
|
||
|
this.setTimeout(bodyTimeout, TIMEOUT_BODY)
|
||
|
} else if (this.timeout) {
|
||
|
// istanbul ignore else: only for jest
|
||
|
if (this.timeout.refresh) {
|
||
|
this.timeout.refresh()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (request.method === 'CONNECT') {
|
||
|
assert(client[kRunning] === 1)
|
||
|
this.upgrade = true
|
||
|
return 2
|
||
|
}
|
||
|
|
||
|
if (upgrade) {
|
||
|
assert(client[kRunning] === 1)
|
||
|
this.upgrade = true
|
||
|
return 2
|
||
|
}
|
||
|
|
||
|
assert(this.headers.length % 2 === 0)
|
||
|
this.headers = []
|
||
|
this.headersSize = 0
|
||
|
|
||
|
if (this.shouldKeepAlive && client[kPipelining]) {
|
||
|
const keepAliveTimeout = this.keepAlive ? util.parseKeepAliveTimeout(this.keepAlive) : null
|
||
|
|
||
|
if (keepAliveTimeout != null) {
|
||
|
const timeout = Math.min(
|
||
|
keepAliveTimeout - client[kKeepAliveTimeoutThreshold],
|
||
|
client[kKeepAliveMaxTimeout]
|
||
|
)
|
||
|
if (timeout <= 0) {
|
||
|
socket[kReset] = true
|
||
|
} else {
|
||
|
client[kKeepAliveTimeoutValue] = timeout
|
||
|
}
|
||
|
} else {
|
||
|
client[kKeepAliveTimeoutValue] = client[kKeepAliveDefaultTimeout]
|
||
|
}
|
||
|
} else {
|
||
|
// Stop more requests from being dispatched.
|
||
|
socket[kReset] = true
|
||
|
}
|
||
|
|
||
|
let pause
|
||
|
try {
|
||
|
pause = request.onHeaders(statusCode, headers, this.resume, statusText) === false
|
||
|
} catch (err) {
|
||
|
util.destroy(socket, err)
|
||
|
return -1
|
||
|
}
|
||
|
|
||
|
if (request.method === 'HEAD') {
|
||
|
return 1
|
||
|
}
|
||
|
|
||
|
if (statusCode < 200) {
|
||
|
return 1
|
||
|
}
|
||
|
|
||
|
if (socket[kBlocking]) {
|
||
|
socket[kBlocking] = false
|
||
|
resume(client)
|
||
|
}
|
||
|
|
||
|
return pause ? constants.ERROR.PAUSED : 0
|
||
|
}
|
||
|
|
||
|
onBody (buf) {
|
||
|
const { client, socket, statusCode, maxResponseSize } = this
|
||
|
|
||
|
if (socket.destroyed) {
|
||
|
return -1
|
||
|
}
|
||
|
|
||
|
const request = client[kQueue][client[kRunningIdx]]
|
||
|
assert(request)
|
||
|
|
||
|
assert.strictEqual(this.timeoutType, TIMEOUT_BODY)
|
||
|
if (this.timeout) {
|
||
|
// istanbul ignore else: only for jest
|
||
|
if (this.timeout.refresh) {
|
||
|
this.timeout.refresh()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
assert(statusCode >= 200)
|
||
|
|
||
|
if (maxResponseSize > -1 && this.bytesRead + buf.length > maxResponseSize) {
|
||
|
util.destroy(socket, new ResponseExceededMaxSizeError())
|
||
|
return -1
|
||
|
}
|
||
|
|
||
|
this.bytesRead += buf.length
|
||
|
|
||
|
try {
|
||
|
if (request.onData(buf) === false) {
|
||
|
return constants.ERROR.PAUSED
|
||
|
}
|
||
|
} catch (err) {
|
||
|
util.destroy(socket, err)
|
||
|
return -1
|
||
|
}
|
||
|
}
|
||
|
|
||
|
onMessageComplete () {
|
||
|
const { client, socket, statusCode, upgrade, headers, contentLength, bytesRead, shouldKeepAlive } = this
|
||
|
|
||
|
if (socket.destroyed && (!statusCode || shouldKeepAlive)) {
|
||
|
return -1
|
||
|
}
|
||
|
|
||
|
if (upgrade) {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
const request = client[kQueue][client[kRunningIdx]]
|
||
|
assert(request)
|
||
|
|
||
|
assert(statusCode >= 100)
|
||
|
|
||
|
this.statusCode = null
|
||
|
this.statusText = ''
|
||
|
this.bytesRead = 0
|
||
|
this.contentLength = ''
|
||
|
this.keepAlive = ''
|
||
|
this.connection = ''
|
||
|
|
||
|
assert(this.headers.length % 2 === 0)
|
||
|
this.headers = []
|
||
|
this.headersSize = 0
|
||
|
|
||
|
if (statusCode < 200) {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
/* istanbul ignore next: should be handled by llhttp? */
|
||
|
if (request.method !== 'HEAD' && contentLength && bytesRead !== parseInt(contentLength, 10)) {
|
||
|
util.destroy(socket, new ResponseContentLengthMismatchError())
|
||
|
return -1
|
||
|
}
|
||
|
|
||
|
try {
|
||
|
request.onComplete(headers)
|
||
|
} catch (err) {
|
||
|
errorRequest(client, request, err)
|
||
|
}
|
||
|
|
||
|
client[kQueue][client[kRunningIdx]++] = null
|
||
|
|
||
|
if (socket[kWriting]) {
|
||
|
assert.strictEqual(client[kRunning], 0)
|
||
|
// Response completed before request.
|
||
|
util.destroy(socket, new InformationalError('reset'))
|
||
|
return constants.ERROR.PAUSED
|
||
|
} else if (!shouldKeepAlive) {
|
||
|
util.destroy(socket, new InformationalError('reset'))
|
||
|
return constants.ERROR.PAUSED
|
||
|
} else if (socket[kReset] && client[kRunning] === 0) {
|
||
|
// Destroy socket once all requests have completed.
|
||
|
// The request at the tail of the pipeline is the one
|
||
|
// that requested reset and no further requests should
|
||
|
// have been queued since then.
|
||
|
util.destroy(socket, new InformationalError('reset'))
|
||
|
return constants.ERROR.PAUSED
|
||
|
} else if (client[kPipelining] === 1) {
|
||
|
// We must wait a full event loop cycle to reuse this socket to make sure
|
||
|
// that non-spec compliant servers are not closing the connection even if they
|
||
|
// said they won't.
|
||
|
setImmediate(resume, client)
|
||
|
} else {
|
||
|
resume(client)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function onParserTimeout (parser) {
|
||
|
const { socket, timeoutType, client } = parser
|
||
|
|
||
|
/* istanbul ignore else */
|
||
|
if (timeoutType === TIMEOUT_HEADERS) {
|
||
|
if (!socket[kWriting] || socket.writableNeedDrain || client[kRunning] > 1) {
|
||
|
assert(!parser.paused, 'cannot be paused while waiting for headers')
|
||
|
util.destroy(socket, new HeadersTimeoutError())
|
||
|
}
|
||
|
} else if (timeoutType === TIMEOUT_BODY) {
|
||
|
if (!parser.paused) {
|
||
|
util.destroy(socket, new BodyTimeoutError())
|
||
|
}
|
||
|
} else if (timeoutType === TIMEOUT_IDLE) {
|
||
|
assert(client[kRunning] === 0 && client[kKeepAliveTimeoutValue])
|
||
|
util.destroy(socket, new InformationalError('socket idle timeout'))
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function onSocketReadable () {
|
||
|
const { [kParser]: parser } = this
|
||
|
if (parser) {
|
||
|
parser.readMore()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function onSocketError (err) {
|
||
|
const { [kClient]: client, [kParser]: parser } = this
|
||
|
|
||
|
assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
|
||
|
|
||
|
if (client[kHTTPConnVersion] !== 'h2') {
|
||
|
// On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded
|
||
|
// to the user.
|
||
|
if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) {
|
||
|
// We treat all incoming data so for as a valid response.
|
||
|
parser.onMessageComplete()
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
|
||
|
this[kError] = err
|
||
|
|
||
|
onError(this[kClient], err)
|
||
|
}
|
||
|
|
||
|
function onError (client, err) {
|
||
|
if (
|
||
|
client[kRunning] === 0 &&
|
||
|
err.code !== 'UND_ERR_INFO' &&
|
||
|
err.code !== 'UND_ERR_SOCKET'
|
||
|
) {
|
||
|
// Error is not caused by running request and not a recoverable
|
||
|
// socket error.
|
||
|
|
||
|
assert(client[kPendingIdx] === client[kRunningIdx])
|
||
|
|
||
|
const requests = client[kQueue].splice(client[kRunningIdx])
|
||
|
for (let i = 0; i < requests.length; i++) {
|
||
|
const request = requests[i]
|
||
|
errorRequest(client, request, err)
|
||
|
}
|
||
|
assert(client[kSize] === 0)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function onSocketEnd () {
|
||
|
const { [kParser]: parser, [kClient]: client } = this
|
||
|
|
||
|
if (client[kHTTPConnVersion] !== 'h2') {
|
||
|
if (parser.statusCode && !parser.shouldKeepAlive) {
|
||
|
// We treat all incoming data so far as a valid response.
|
||
|
parser.onMessageComplete()
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
|
||
|
util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
|
||
|
}
|
||
|
|
||
|
function onSocketClose () {
|
||
|
const { [kClient]: client, [kParser]: parser } = this
|
||
|
|
||
|
if (client[kHTTPConnVersion] === 'h1' && parser) {
|
||
|
if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) {
|
||
|
// We treat all incoming data so far as a valid response.
|
||
|
parser.onMessageComplete()
|
||
|
}
|
||
|
|
||
|
this[kParser].destroy()
|
||
|
this[kParser] = null
|
||
|
}
|
||
|
|
||
|
const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))
|
||
|
|
||
|
client[kSocket] = null
|
||
|
|
||
|
if (client.destroyed) {
|
||
|
assert(client[kPending] === 0)
|
||
|
|
||
|
// Fail entire queue.
|
||
|
const requests = client[kQueue].splice(client[kRunningIdx])
|
||
|
for (let i = 0; i < requests.length; i++) {
|
||
|
const request = requests[i]
|
||
|
errorRequest(client, request, err)
|
||
|
}
|
||
|
} else if (client[kRunning] > 0 && err.code !== 'UND_ERR_INFO') {
|
||
|
// Fail head of pipeline.
|
||
|
const request = client[kQueue][client[kRunningIdx]]
|
||
|
client[kQueue][client[kRunningIdx]++] = null
|
||
|
|
||
|
errorRequest(client, request, err)
|
||
|
}
|
||
|
|
||
|
client[kPendingIdx] = client[kRunningIdx]
|
||
|
|
||
|
assert(client[kRunning] === 0)
|
||
|
|
||
|
client.emit('disconnect', client[kUrl], [client], err)
|
||
|
|
||
|
resume(client)
|
||
|
}
|
||
|
|
||
|
async function connect (client) {
|
||
|
assert(!client[kConnecting])
|
||
|
assert(!client[kSocket])
|
||
|
|
||
|
let { host, hostname, protocol, port } = client[kUrl]
|
||
|
|
||
|
// Resolve ipv6
|
||
|
if (hostname[0] === '[') {
|
||
|
const idx = hostname.indexOf(']')
|
||
|
|
||
|
assert(idx !== -1)
|
||
|
const ip = hostname.substr(1, idx - 1)
|
||
|
|
||
|
assert(net.isIP(ip))
|
||
|
hostname = ip
|
||
|
}
|
||
|
|
||
|
client[kConnecting] = true
|
||
|
|
||
|
if (channels.beforeConnect.hasSubscribers) {
|
||
|
channels.beforeConnect.publish({
|
||
|
connectParams: {
|
||
|
host,
|
||
|
hostname,
|
||
|
protocol,
|
||
|
port,
|
||
|
servername: client[kServerName],
|
||
|
localAddress: client[kLocalAddress]
|
||
|
},
|
||
|
connector: client[kConnector]
|
||
|
})
|
||
|
}
|
||
|
|
||
|
try {
|
||
|
const socket = await new Promise((resolve, reject) => {
|
||
|
client[kConnector]({
|
||
|
host,
|
||
|
hostname,
|
||
|
protocol,
|
||
|
port,
|
||
|
servername: client[kServerName],
|
||
|
localAddress: client[kLocalAddress]
|
||
|
}, (err, socket) => {
|
||
|
if (err) {
|
||
|
reject(err)
|
||
|
} else {
|
||
|
resolve(socket)
|
||
|
}
|
||
|
})
|
||
|
})
|
||
|
|
||
|
if (client.destroyed) {
|
||
|
util.destroy(socket.on('error', () => {}), new ClientDestroyedError())
|
||
|
return
|
||
|
}
|
||
|
|
||
|
client[kConnecting] = false
|
||
|
|
||
|
assert(socket)
|
||
|
|
||
|
const isH2 = socket.alpnProtocol === 'h2'
|
||
|
if (isH2) {
|
||
|
if (!h2ExperimentalWarned) {
|
||
|
h2ExperimentalWarned = true
|
||
|
process.emitWarning('H2 support is experimental, expect them to change at any time.', {
|
||
|
code: 'UNDICI-H2'
|
||
|
})
|
||
|
}
|
||
|
|
||
|
const session = http2.connect(client[kUrl], {
|
||
|
createConnection: () => socket,
|
||
|
peerMaxConcurrentStreams: client[kHTTP2SessionState].maxConcurrentStreams
|
||
|
})
|
||
|
|
||
|
client[kHTTPConnVersion] = 'h2'
|
||
|
session[kClient] = client
|
||
|
session[kSocket] = socket
|
||
|
session.on('error', onHttp2SessionError)
|
||
|
session.on('frameError', onHttp2FrameError)
|
||
|
session.on('end', onHttp2SessionEnd)
|
||
|
session.on('goaway', onHTTP2GoAway)
|
||
|
session.on('close', onSocketClose)
|
||
|
session.unref()
|
||
|
|
||
|
client[kHTTP2Session] = session
|
||
|
socket[kHTTP2Session] = session
|
||
|
} else {
|
||
|
if (!llhttpInstance) {
|
||
|
llhttpInstance = await llhttpPromise
|
||
|
llhttpPromise = null
|
||
|
}
|
||
|
|
||
|
socket[kNoRef] = false
|
||
|
socket[kWriting] = false
|
||
|
socket[kReset] = false
|
||
|
socket[kBlocking] = false
|
||
|
socket[kParser] = new Parser(client, socket, llhttpInstance)
|
||
|
}
|
||
|
|
||
|
socket[kCounter] = 0
|
||
|
socket[kMaxRequests] = client[kMaxRequests]
|
||
|
socket[kClient] = client
|
||
|
socket[kError] = null
|
||
|
|
||
|
socket
|
||
|
.on('error', onSocketError)
|
||
|
.on('readable', onSocketReadable)
|
||
|
.on('end', onSocketEnd)
|
||
|
.on('close', onSocketClose)
|
||
|
|
||
|
client[kSocket] = socket
|
||
|
|
||
|
if (channels.connected.hasSubscribers) {
|
||
|
channels.connected.publish({
|
||
|
connectParams: {
|
||
|
host,
|
||
|
hostname,
|
||
|
protocol,
|
||
|
port,
|
||
|
servername: client[kServerName],
|
||
|
localAddress: client[kLocalAddress]
|
||
|
},
|
||
|
connector: client[kConnector],
|
||
|
socket
|
||
|
})
|
||
|
}
|
||
|
client.emit('connect', client[kUrl], [client])
|
||
|
} catch (err) {
|
||
|
if (client.destroyed) {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
client[kConnecting] = false
|
||
|
|
||
|
if (channels.connectError.hasSubscribers) {
|
||
|
channels.connectError.publish({
|
||
|
connectParams: {
|
||
|
host,
|
||
|
hostname,
|
||
|
protocol,
|
||
|
port,
|
||
|
servername: client[kServerName],
|
||
|
localAddress: client[kLocalAddress]
|
||
|
},
|
||
|
connector: client[kConnector],
|
||
|
error: err
|
||
|
})
|
||
|
}
|
||
|
|
||
|
if (err.code === 'ERR_TLS_CERT_ALTNAME_INVALID') {
|
||
|
assert(client[kRunning] === 0)
|
||
|
while (client[kPending] > 0 && client[kQueue][client[kPendingIdx]].servername === client[kServerName]) {
|
||
|
const request = client[kQueue][client[kPendingIdx]++]
|
||
|
errorRequest(client, request, err)
|
||
|
}
|
||
|
} else {
|
||
|
onError(client, err)
|
||
|
}
|
||
|
|
||
|
client.emit('connectionError', client[kUrl], [client], err)
|
||
|
}
|
||
|
|
||
|
resume(client)
|
||
|
}
|
||
|
|
||
|
function emitDrain (client) {
|
||
|
client[kNeedDrain] = 0
|
||
|
client.emit('drain', client[kUrl], [client])
|
||
|
}
|
||
|
|
||
|
function resume (client, sync) {
|
||
|
if (client[kResuming] === 2) {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
client[kResuming] = 2
|
||
|
|
||
|
_resume(client, sync)
|
||
|
client[kResuming] = 0
|
||
|
|
||
|
if (client[kRunningIdx] > 256) {
|
||
|
client[kQueue].splice(0, client[kRunningIdx])
|
||
|
client[kPendingIdx] -= client[kRunningIdx]
|
||
|
client[kRunningIdx] = 0
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function _resume (client, sync) {
|
||
|
while (true) {
|
||
|
if (client.destroyed) {
|
||
|
assert(client[kPending] === 0)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if (client[kClosedResolve] && !client[kSize]) {
|
||
|
client[kClosedResolve]()
|
||
|
client[kClosedResolve] = null
|
||
|
return
|
||
|
}
|
||
|
|
||
|
const socket = client[kSocket]
|
||
|
|
||
|
if (socket && !socket.destroyed && socket.alpnProtocol !== 'h2') {
|
||
|
if (client[kSize] === 0) {
|
||
|
if (!socket[kNoRef] && socket.unref) {
|
||
|
socket.unref()
|
||
|
socket[kNoRef] = true
|
||
|
}
|
||
|
} else if (socket[kNoRef] && socket.ref) {
|
||
|
socket.ref()
|
||
|
socket[kNoRef] = false
|
||
|
}
|
||
|
|
||
|
if (client[kSize] === 0) {
|
||
|
if (socket[kParser].timeoutType !== TIMEOUT_IDLE) {
|
||
|
socket[kParser].setTimeout(client[kKeepAliveTimeoutValue], TIMEOUT_IDLE)
|
||
|
}
|
||
|
} else if (client[kRunning] > 0 && socket[kParser].statusCode < 200) {
|
||
|
if (socket[kParser].timeoutType !== TIMEOUT_HEADERS) {
|
||
|
const request = client[kQueue][client[kRunningIdx]]
|
||
|
const headersTimeout = request.headersTimeout != null
|
||
|
? request.headersTimeout
|
||
|
: client[kHeadersTimeout]
|
||
|
socket[kParser].setTimeout(headersTimeout, TIMEOUT_HEADERS)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (client[kBusy]) {
|
||
|
client[kNeedDrain] = 2
|
||
|
} else if (client[kNeedDrain] === 2) {
|
||
|
if (sync) {
|
||
|
client[kNeedDrain] = 1
|
||
|
process.nextTick(emitDrain, client)
|
||
|
} else {
|
||
|
emitDrain(client)
|
||
|
}
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
if (client[kPending] === 0) {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if (client[kRunning] >= (client[kPipelining] || 1)) {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
const request = client[kQueue][client[kPendingIdx]]
|
||
|
|
||
|
if (client[kUrl].protocol === 'https:' && client[kServerName] !== request.servername) {
|
||
|
if (client[kRunning] > 0) {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
client[kServerName] = request.servername
|
||
|
|
||
|
if (socket && socket.servername !== request.servername) {
|
||
|
util.destroy(socket, new InformationalError('servername changed'))
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (client[kConnecting]) {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if (!socket && !client[kHTTP2Session]) {
|
||
|
connect(client)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if (socket.destroyed || socket[kWriting] || socket[kReset] || socket[kBlocking]) {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if (client[kRunning] > 0 && !request.idempotent) {
|
||
|
// Non-idempotent request cannot be retried.
|
||
|
// Ensure that no other requests are inflight and
|
||
|
// could cause failure.
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if (client[kRunning] > 0 && (request.upgrade || request.method === 'CONNECT')) {
|
||
|
// Don't dispatch an upgrade until all preceding requests have completed.
|
||
|
// A misbehaving server might upgrade the connection before all pipelined
|
||
|
// request has completed.
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 &&
|
||
|
(util.isStream(request.body) || util.isAsyncIterable(request.body))) {
|
||
|
// Request with stream or iterator body can error while other requests
|
||
|
// are inflight and indirectly error those as well.
|
||
|
// Ensure this doesn't happen by waiting for inflight
|
||
|
// to complete before dispatching.
|
||
|
|
||
|
// Request with stream or iterator body cannot be retried.
|
||
|
// Ensure that no other requests are inflight and
|
||
|
// could cause failure.
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if (!request.aborted && write(client, request)) {
|
||
|
client[kPendingIdx]++
|
||
|
} else {
|
||
|
client[kQueue].splice(client[kPendingIdx], 1)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2
|
||
|
function shouldSendContentLength (method) {
|
||
|
return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT'
|
||
|
}
|
||
|
|
||
|
function write (client, request) {
|
||
|
if (client[kHTTPConnVersion] === 'h2') {
|
||
|
writeH2(client, client[kHTTP2Session], request)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
const { body, method, path, host, upgrade, headers, blocking, reset } = request
|
||
|
|
||
|
// https://tools.ietf.org/html/rfc7231#section-4.3.1
|
||
|
// https://tools.ietf.org/html/rfc7231#section-4.3.2
|
||
|
// https://tools.ietf.org/html/rfc7231#section-4.3.5
|
||
|
|
||
|
// Sending a payload body on a request that does not
|
||
|
// expect it can cause undefined behavior on some
|
||
|
// servers and corrupt connection state. Do not
|
||
|
// re-use the connection for further requests.
|
||
|
|
||
|
const expectsPayload = (
|
||
|
method === 'PUT' ||
|
||
|
method === 'POST' ||
|
||
|
method === 'PATCH'
|
||
|
)
|
||
|
|
||
|
if (body && typeof body.read === 'function') {
|
||
|
// Try to read EOF in order to get length.
|
||
|
body.read(0)
|
||
|
}
|
||
|
|
||
|
const bodyLength = util.bodyLength(body)
|
||
|
|
||
|
let contentLength = bodyLength
|
||
|
|
||
|
if (contentLength === null) {
|
||
|
contentLength = request.contentLength
|
||
|
}
|
||
|
|
||
|
if (contentLength === 0 && !expectsPayload) {
|
||
|
// https://tools.ietf.org/html/rfc7230#section-3.3.2
|
||
|
// A user agent SHOULD NOT send a Content-Length header field when
|
||
|
// the request message does not contain a payload body and the method
|
||
|
// semantics do not anticipate such a body.
|
||
|
|
||
|
contentLength = null
|
||
|
}
|
||
|
|
||
|
// https://github.com/nodejs/undici/issues/2046
|
||
|
// A user agent may send a Content-Length header with 0 value, this should be allowed.
|
||
|
if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength !== null && request.contentLength !== contentLength) {
|
||
|
if (client[kStrictContentLength]) {
|
||
|
errorRequest(client, request, new RequestContentLengthMismatchError())
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
process.emitWarning(new RequestContentLengthMismatchError())
|
||
|
}
|
||
|
|
||
|
const socket = client[kSocket]
|
||
|
|
||
|
try {
|
||
|
request.onConnect((err) => {
|
||
|
if (request.aborted || request.completed) {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
errorRequest(client, request, err || new RequestAbortedError())
|
||
|
|
||
|
util.destroy(socket, new InformationalError('aborted'))
|
||
|
})
|
||
|
} catch (err) {
|
||
|
errorRequest(client, request, err)
|
||
|
}
|
||
|
|
||
|
if (request.aborted) {
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
if (method === 'HEAD') {
|
||
|
// https://github.com/mcollina/undici/issues/258
|
||
|
// Close after a HEAD request to interop with misbehaving servers
|
||
|
// that may send a body in the response.
|
||
|
|
||
|
socket[kReset] = true
|
||
|
}
|
||
|
|
||
|
if (upgrade || method === 'CONNECT') {
|
||
|
// On CONNECT or upgrade, block pipeline from dispatching further
|
||
|
// requests on this connection.
|
||
|
|
||
|
socket[kReset] = true
|
||
|
}
|
||
|
|
||
|
if (reset != null) {
|
||
|
socket[kReset] = reset
|
||
|
}
|
||
|
|
||
|
if (client[kMaxRequests] && socket[kCounter]++ >= client[kMaxRequests]) {
|
||
|
socket[kReset] = true
|
||
|
}
|
||
|
|
||
|
if (blocking) {
|
||
|
socket[kBlocking] = true
|
||
|
}
|
||
|
|
||
|
let header = `${method} ${path} HTTP/1.1\r\n`
|
||
|
|
||
|
if (typeof host === 'string') {
|
||
|
header += `host: ${host}\r\n`
|
||
|
} else {
|
||
|
header += client[kHostHeader]
|
||
|
}
|
||
|
|
||
|
if (upgrade) {
|
||
|
header += `connection: upgrade\r\nupgrade: ${upgrade}\r\n`
|
||
|
} else if (client[kPipelining] && !socket[kReset]) {
|
||
|
header += 'connection: keep-alive\r\n'
|
||
|
} else {
|
||
|
header += 'connection: close\r\n'
|
||
|
}
|
||
|
|
||
|
if (headers) {
|
||
|
header += headers
|
||
|
}
|
||
|
|
||
|
if (channels.sendHeaders.hasSubscribers) {
|
||
|
channels.sendHeaders.publish({ request, headers: header, socket })
|
||
|
}
|
||
|
|
||
|
/* istanbul ignore else: assertion */
|
||
|
if (!body || bodyLength === 0) {
|
||
|
if (contentLength === 0) {
|
||
|
socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
|
||
|
} else {
|
||
|
assert(contentLength === null, 'no body must not have content length')
|
||
|
socket.write(`${header}\r\n`, 'latin1')
|
||
|
}
|
||
|
request.onRequestSent()
|
||
|
} else if (util.isBuffer(body)) {
|
||
|
assert(contentLength === body.byteLength, 'buffer body must have content length')
|
||
|
|
||
|
socket.cork()
|
||
|
socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
|
||
|
socket.write(body)
|
||
|
socket.uncork()
|
||
|
request.onBodySent(body)
|
||
|
request.onRequestSent()
|
||
|
if (!expectsPayload) {
|
||
|
socket[kReset] = true
|
||
|
}
|
||
|
} else if (util.isBlobLike(body)) {
|
||
|
if (typeof body.stream === 'function') {
|
||
|
writeIterable({ body: body.stream(), client, request, socket, contentLength, header, expectsPayload })
|
||
|
} else {
|
||
|
writeBlob({ body, client, request, socket, contentLength, header, expectsPayload })
|
||
|
}
|
||
|
} else if (util.isStream(body)) {
|
||
|
writeStream({ body, client, request, socket, contentLength, header, expectsPayload })
|
||
|
} else if (util.isIterable(body)) {
|
||
|
writeIterable({ body, client, request, socket, contentLength, header, expectsPayload })
|
||
|
} else {
|
||
|
assert(false)
|
||
|
}
|
||
|
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
function writeH2 (client, session, request) {
|
||
|
const { body, method, path, host, upgrade, expectContinue, signal, headers: reqHeaders } = request
|
||
|
|
||
|
let headers
|
||
|
if (typeof reqHeaders === 'string') headers = Request[kHTTP2CopyHeaders](reqHeaders.trim())
|
||
|
else headers = reqHeaders
|
||
|
|
||
|
if (upgrade) {
|
||
|
errorRequest(client, request, new Error('Upgrade not supported for H2'))
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
try {
|
||
|
// TODO(HTTP/2): Should we call onConnect immediately or on stream ready event?
|
||
|
request.onConnect((err) => {
|
||
|
if (request.aborted || request.completed) {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
errorRequest(client, request, err || new RequestAbortedError())
|
||
|
})
|
||
|
} catch (err) {
|
||
|
errorRequest(client, request, err)
|
||
|
}
|
||
|
|
||
|
if (request.aborted) {
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
let stream
|
||
|
const h2State = client[kHTTP2SessionState]
|
||
|
|
||
|
headers[HTTP2_HEADER_AUTHORITY] = host || client[kHost]
|
||
|
headers[HTTP2_HEADER_METHOD] = method
|
||
|
|
||
|
if (method === 'CONNECT') {
|
||
|
session.ref()
|
||
|
// we are already connected, streams are pending, first request
|
||
|
// will create a new stream. We trigger a request to create the stream and wait until
|
||
|
// `ready` event is triggered
|
||
|
// We disabled endStream to allow the user to write to the stream
|
||
|
stream = session.request(headers, { endStream: false, signal })
|
||
|
|
||
|
if (stream.id && !stream.pending) {
|
||
|
request.onUpgrade(null, null, stream)
|
||
|
++h2State.openStreams
|
||
|
} else {
|
||
|
stream.once('ready', () => {
|
||
|
request.onUpgrade(null, null, stream)
|
||
|
++h2State.openStreams
|
||
|
})
|
||
|
}
|
||
|
|
||
|
stream.once('close', () => {
|
||
|
h2State.openStreams -= 1
|
||
|
// TODO(HTTP/2): unref only if current streams count is 0
|
||
|
if (h2State.openStreams === 0) session.unref()
|
||
|
})
|
||
|
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
// https://tools.ietf.org/html/rfc7540#section-8.3
|
||
|
// :path and :scheme headers must be omited when sending CONNECT
|
||
|
|
||
|
headers[HTTP2_HEADER_PATH] = path
|
||
|
headers[HTTP2_HEADER_SCHEME] = 'https'
|
||
|
|
||
|
// https://tools.ietf.org/html/rfc7231#section-4.3.1
|
||
|
// https://tools.ietf.org/html/rfc7231#section-4.3.2
|
||
|
// https://tools.ietf.org/html/rfc7231#section-4.3.5
|
||
|
|
||
|
// Sending a payload body on a request that does not
|
||
|
// expect it can cause undefined behavior on some
|
||
|
// servers and corrupt connection state. Do not
|
||
|
// re-use the connection for further requests.
|
||
|
|
||
|
const expectsPayload = (
|
||
|
method === 'PUT' ||
|
||
|
method === 'POST' ||
|
||
|
method === 'PATCH'
|
||
|
)
|
||
|
|
||
|
if (body && typeof body.read === 'function') {
|
||
|
// Try to read EOF in order to get length.
|
||
|
body.read(0)
|
||
|
}
|
||
|
|
||
|
let contentLength = util.bodyLength(body)
|
||
|
|
||
|
if (contentLength == null) {
|
||
|
contentLength = request.contentLength
|
||
|
}
|
||
|
|
||
|
if (contentLength === 0 || !expectsPayload) {
|
||
|
// https://tools.ietf.org/html/rfc7230#section-3.3.2
|
||
|
// A user agent SHOULD NOT send a Content-Length header field when
|
||
|
// the request message does not contain a payload body and the method
|
||
|
// semantics do not anticipate such a body.
|
||
|
|
||
|
contentLength = null
|
||
|
}
|
||
|
|
||
|
// https://github.com/nodejs/undici/issues/2046
|
||
|
// A user agent may send a Content-Length header with 0 value, this should be allowed.
|
||
|
if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength != null && request.contentLength !== contentLength) {
|
||
|
if (client[kStrictContentLength]) {
|
||
|
errorRequest(client, request, new RequestContentLengthMismatchError())
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
process.emitWarning(new RequestContentLengthMismatchError())
|
||
|
}
|
||
|
|
||
|
if (contentLength != null) {
|
||
|
assert(body, 'no body must not have content length')
|
||
|
headers[HTTP2_HEADER_CONTENT_LENGTH] = `${contentLength}`
|
||
|
}
|
||
|
|
||
|
session.ref()
|
||
|
|
||
|
const shouldEndStream = method === 'GET' || method === 'HEAD'
|
||
|
if (expectContinue) {
|
||
|
headers[HTTP2_HEADER_EXPECT] = '100-continue'
|
||
|
/**
|
||
|
* @type {import('node:http2').ClientHttp2Stream}
|
||
|
*/
|
||
|
stream = session.request(headers, { endStream: shouldEndStream, signal })
|
||
|
|
||
|
stream.once('continue', writeBodyH2)
|
||
|
} else {
|
||
|
/** @type {import('node:http2').ClientHttp2Stream} */
|
||
|
stream = session.request(headers, {
|
||
|
endStream: shouldEndStream,
|
||
|
signal
|
||
|
})
|
||
|
writeBodyH2()
|
||
|
}
|
||
|
|
||
|
// Increment counter as we have new several streams open
|
||
|
++h2State.openStreams
|
||
|
|
||
|
stream.once('response', headers => {
|
||
|
if (request.onHeaders(Number(headers[HTTP2_HEADER_STATUS]), headers, stream.resume.bind(stream), '') === false) {
|
||
|
stream.pause()
|
||
|
}
|
||
|
})
|
||
|
|
||
|
stream.once('end', () => {
|
||
|
request.onComplete([])
|
||
|
})
|
||
|
|
||
|
stream.on('data', (chunk) => {
|
||
|
if (request.onData(chunk) === false) stream.pause()
|
||
|
})
|
||
|
|
||
|
stream.once('close', () => {
|
||
|
h2State.openStreams -= 1
|
||
|
// TODO(HTTP/2): unref only if current streams count is 0
|
||
|
if (h2State.openStreams === 0) session.unref()
|
||
|
})
|
||
|
|
||
|
stream.once('error', function (err) {
|
||
|
if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
|
||
|
h2State.streams -= 1
|
||
|
util.destroy(stream, err)
|
||
|
}
|
||
|
})
|
||
|
|
||
|
stream.once('frameError', (type, code) => {
|
||
|
const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)
|
||
|
errorRequest(client, request, err)
|
||
|
|
||
|
if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
|
||
|
h2State.streams -= 1
|
||
|
util.destroy(stream, err)
|
||
|
}
|
||
|
})
|
||
|
|
||
|
// stream.on('aborted', () => {
|
||
|
// // TODO(HTTP/2): Support aborted
|
||
|
// })
|
||
|
|
||
|
// stream.on('timeout', () => {
|
||
|
// // TODO(HTTP/2): Support timeout
|
||
|
// })
|
||
|
|
||
|
// stream.on('push', headers => {
|
||
|
// // TODO(HTTP/2): Suppor push
|
||
|
// })
|
||
|
|
||
|
// stream.on('trailers', headers => {
|
||
|
// // TODO(HTTP/2): Support trailers
|
||
|
// })
|
||
|
|
||
|
return true
|
||
|
|
||
|
function writeBodyH2 () {
|
||
|
/* istanbul ignore else: assertion */
|
||
|
if (!body) {
|
||
|
request.onRequestSent()
|
||
|
} else if (util.isBuffer(body)) {
|
||
|
assert(contentLength === body.byteLength, 'buffer body must have content length')
|
||
|
stream.cork()
|
||
|
stream.write(body)
|
||
|
stream.uncork()
|
||
|
stream.end()
|
||
|
request.onBodySent(body)
|
||
|
request.onRequestSent()
|
||
|
} else if (util.isBlobLike(body)) {
|
||
|
if (typeof body.stream === 'function') {
|
||
|
writeIterable({
|
||
|
client,
|
||
|
request,
|
||
|
contentLength,
|
||
|
h2stream: stream,
|
||
|
expectsPayload,
|
||
|
body: body.stream(),
|
||
|
socket: client[kSocket],
|
||
|
header: ''
|
||
|
})
|
||
|
} else {
|
||
|
writeBlob({
|
||
|
body,
|
||
|
client,
|
||
|
request,
|
||
|
contentLength,
|
||
|
expectsPayload,
|
||
|
h2stream: stream,
|
||
|
header: '',
|
||
|
socket: client[kSocket]
|
||
|
})
|
||
|
}
|
||
|
} else if (util.isStream(body)) {
|
||
|
writeStream({
|
||
|
body,
|
||
|
client,
|
||
|
request,
|
||
|
contentLength,
|
||
|
expectsPayload,
|
||
|
socket: client[kSocket],
|
||
|
h2stream: stream,
|
||
|
header: ''
|
||
|
})
|
||
|
} else if (util.isIterable(body)) {
|
||
|
writeIterable({
|
||
|
body,
|
||
|
client,
|
||
|
request,
|
||
|
contentLength,
|
||
|
expectsPayload,
|
||
|
header: '',
|
||
|
h2stream: stream,
|
||
|
socket: client[kSocket]
|
||
|
})
|
||
|
} else {
|
||
|
assert(false)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function writeStream ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
|
||
|
assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined')
|
||
|
|
||
|
if (client[kHTTPConnVersion] === 'h2') {
|
||
|
// For HTTP/2, is enough to pipe the stream
|
||
|
const pipe = pipeline(
|
||
|
body,
|
||
|
h2stream,
|
||
|
(err) => {
|
||
|
if (err) {
|
||
|
util.destroy(body, err)
|
||
|
util.destroy(h2stream, err)
|
||
|
} else {
|
||
|
request.onRequestSent()
|
||
|
}
|
||
|
}
|
||
|
)
|
||
|
|
||
|
pipe.on('data', onPipeData)
|
||
|
pipe.once('end', () => {
|
||
|
pipe.removeListener('data', onPipeData)
|
||
|
util.destroy(pipe)
|
||
|
})
|
||
|
|
||
|
function onPipeData (chunk) {
|
||
|
request.onBodySent(chunk)
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
let finished = false
|
||
|
|
||
|
const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header })
|
||
|
|
||
|
const onData = function (chunk) {
|
||
|
if (finished) {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
try {
|
||
|
if (!writer.write(chunk) && this.pause) {
|
||
|
this.pause()
|
||
|
}
|
||
|
} catch (err) {
|
||
|
util.destroy(this, err)
|
||
|
}
|
||
|
}
|
||
|
const onDrain = function () {
|
||
|
if (finished) {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if (body.resume) {
|
||
|
body.resume()
|
||
|
}
|
||
|
}
|
||
|
const onAbort = function () {
|
||
|
onFinished(new RequestAbortedError())
|
||
|
}
|
||
|
const onFinished = function (err) {
|
||
|
if (finished) {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
finished = true
|
||
|
|
||
|
assert(socket.destroyed || (socket[kWriting] && client[kRunning] <= 1))
|
||
|
|
||
|
socket
|
||
|
.off('drain', onDrain)
|
||
|
.off('error', onFinished)
|
||
|
|
||
|
body
|
||
|
.removeListener('data', onData)
|
||
|
.removeListener('end', onFinished)
|
||
|
.removeListener('error', onFinished)
|
||
|
.removeListener('close', onAbort)
|
||
|
|
||
|
if (!err) {
|
||
|
try {
|
||
|
writer.end()
|
||
|
} catch (er) {
|
||
|
err = er
|
||
|
}
|
||
|
}
|
||
|
|
||
|
writer.destroy(err)
|
||
|
|
||
|
if (err && (err.code !== 'UND_ERR_INFO' || err.message !== 'reset')) {
|
||
|
util.destroy(body, err)
|
||
|
} else {
|
||
|
util.destroy(body)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
body
|
||
|
.on('data', onData)
|
||
|
.on('end', onFinished)
|
||
|
.on('error', onFinished)
|
||
|
.on('close', onAbort)
|
||
|
|
||
|
if (body.resume) {
|
||
|
body.resume()
|
||
|
}
|
||
|
|
||
|
socket
|
||
|
.on('drain', onDrain)
|
||
|
.on('error', onFinished)
|
||
|
}
|
||
|
|
||
|
async function writeBlob ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
|
||
|
assert(contentLength === body.size, 'blob body must have content length')
|
||
|
|
||
|
const isH2 = client[kHTTPConnVersion] === 'h2'
|
||
|
try {
|
||
|
if (contentLength != null && contentLength !== body.size) {
|
||
|
throw new RequestContentLengthMismatchError()
|
||
|
}
|
||
|
|
||
|
const buffer = Buffer.from(await body.arrayBuffer())
|
||
|
|
||
|
if (isH2) {
|
||
|
h2stream.cork()
|
||
|
h2stream.write(buffer)
|
||
|
h2stream.uncork()
|
||
|
} else {
|
||
|
socket.cork()
|
||
|
socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
|
||
|
socket.write(buffer)
|
||
|
socket.uncork()
|
||
|
}
|
||
|
|
||
|
request.onBodySent(buffer)
|
||
|
request.onRequestSent()
|
||
|
|
||
|
if (!expectsPayload) {
|
||
|
socket[kReset] = true
|
||
|
}
|
||
|
|
||
|
resume(client)
|
||
|
} catch (err) {
|
||
|
util.destroy(isH2 ? h2stream : socket, err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
async function writeIterable ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
|
||
|
assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined')
|
||
|
|
||
|
let callback = null
|
||
|
function onDrain () {
|
||
|
if (callback) {
|
||
|
const cb = callback
|
||
|
callback = null
|
||
|
cb()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
const waitForDrain = () => new Promise((resolve, reject) => {
|
||
|
assert(callback === null)
|
||
|
|
||
|
if (socket[kError]) {
|
||
|
reject(socket[kError])
|
||
|
} else {
|
||
|
callback = resolve
|
||
|
}
|
||
|
})
|
||
|
|
||
|
if (client[kHTTPConnVersion] === 'h2') {
|
||
|
h2stream
|
||
|
.on('close', onDrain)
|
||
|
.on('drain', onDrain)
|
||
|
|
||
|
try {
|
||
|
// It's up to the user to somehow abort the async iterable.
|
||
|
for await (const chunk of body) {
|
||
|
if (socket[kError]) {
|
||
|
throw socket[kError]
|
||
|
}
|
||
|
|
||
|
const res = h2stream.write(chunk)
|
||
|
request.onBodySent(chunk)
|
||
|
if (!res) {
|
||
|
await waitForDrain()
|
||
|
}
|
||
|
}
|
||
|
} catch (err) {
|
||
|
h2stream.destroy(err)
|
||
|
} finally {
|
||
|
request.onRequestSent()
|
||
|
h2stream.end()
|
||
|
h2stream
|
||
|
.off('close', onDrain)
|
||
|
.off('drain', onDrain)
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
socket
|
||
|
.on('close', onDrain)
|
||
|
.on('drain', onDrain)
|
||
|
|
||
|
const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header })
|
||
|
try {
|
||
|
// It's up to the user to somehow abort the async iterable.
|
||
|
for await (const chunk of body) {
|
||
|
if (socket[kError]) {
|
||
|
throw socket[kError]
|
||
|
}
|
||
|
|
||
|
if (!writer.write(chunk)) {
|
||
|
await waitForDrain()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
writer.end()
|
||
|
} catch (err) {
|
||
|
writer.destroy(err)
|
||
|
} finally {
|
||
|
socket
|
||
|
.off('close', onDrain)
|
||
|
.off('drain', onDrain)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
class AsyncWriter {
|
||
|
constructor ({ socket, request, contentLength, client, expectsPayload, header }) {
|
||
|
this.socket = socket
|
||
|
this.request = request
|
||
|
this.contentLength = contentLength
|
||
|
this.client = client
|
||
|
this.bytesWritten = 0
|
||
|
this.expectsPayload = expectsPayload
|
||
|
this.header = header
|
||
|
|
||
|
socket[kWriting] = true
|
||
|
}
|
||
|
|
||
|
write (chunk) {
|
||
|
const { socket, request, contentLength, client, bytesWritten, expectsPayload, header } = this
|
||
|
|
||
|
if (socket[kError]) {
|
||
|
throw socket[kError]
|
||
|
}
|
||
|
|
||
|
if (socket.destroyed) {
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
const len = Buffer.byteLength(chunk)
|
||
|
if (!len) {
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
// We should defer writing chunks.
|
||
|
if (contentLength !== null && bytesWritten + len > contentLength) {
|
||
|
if (client[kStrictContentLength]) {
|
||
|
throw new RequestContentLengthMismatchError()
|
||
|
}
|
||
|
|
||
|
process.emitWarning(new RequestContentLengthMismatchError())
|
||
|
}
|
||
|
|
||
|
socket.cork()
|
||
|
|
||
|
if (bytesWritten === 0) {
|
||
|
if (!expectsPayload) {
|
||
|
socket[kReset] = true
|
||
|
}
|
||
|
|
||
|
if (contentLength === null) {
|
||
|
socket.write(`${header}transfer-encoding: chunked\r\n`, 'latin1')
|
||
|
} else {
|
||
|
socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (contentLength === null) {
|
||
|
socket.write(`\r\n${len.toString(16)}\r\n`, 'latin1')
|
||
|
}
|
||
|
|
||
|
this.bytesWritten += len
|
||
|
|
||
|
const ret = socket.write(chunk)
|
||
|
|
||
|
socket.uncork()
|
||
|
|
||
|
request.onBodySent(chunk)
|
||
|
|
||
|
if (!ret) {
|
||
|
if (socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS) {
|
||
|
// istanbul ignore else: only for jest
|
||
|
if (socket[kParser].timeout.refresh) {
|
||
|
socket[kParser].timeout.refresh()
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return ret
|
||
|
}
|
||
|
|
||
|
end () {
|
||
|
const { socket, contentLength, client, bytesWritten, expectsPayload, header, request } = this
|
||
|
request.onRequestSent()
|
||
|
|
||
|
socket[kWriting] = false
|
||
|
|
||
|
if (socket[kError]) {
|
||
|
throw socket[kError]
|
||
|
}
|
||
|
|
||
|
if (socket.destroyed) {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if (bytesWritten === 0) {
|
||
|
if (expectsPayload) {
|
||
|
// https://tools.ietf.org/html/rfc7230#section-3.3.2
|
||
|
// A user agent SHOULD send a Content-Length in a request message when
|
||
|
// no Transfer-Encoding is sent and the request method defines a meaning
|
||
|
// for an enclosed payload body.
|
||
|
|
||
|
socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
|
||
|
} else {
|
||
|
socket.write(`${header}\r\n`, 'latin1')
|
||
|
}
|
||
|
} else if (contentLength === null) {
|
||
|
socket.write('\r\n0\r\n\r\n', 'latin1')
|
||
|
}
|
||
|
|
||
|
if (contentLength !== null && bytesWritten !== contentLength) {
|
||
|
if (client[kStrictContentLength]) {
|
||
|
throw new RequestContentLengthMismatchError()
|
||
|
} else {
|
||
|
process.emitWarning(new RequestContentLengthMismatchError())
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS) {
|
||
|
// istanbul ignore else: only for jest
|
||
|
if (socket[kParser].timeout.refresh) {
|
||
|
socket[kParser].timeout.refresh()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
resume(client)
|
||
|
}
|
||
|
|
||
|
destroy (err) {
|
||
|
const { socket, client } = this
|
||
|
|
||
|
socket[kWriting] = false
|
||
|
|
||
|
if (err) {
|
||
|
assert(client[kRunning] <= 1, 'pipeline should only contain this request')
|
||
|
util.destroy(socket, err)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function errorRequest (client, request, err) {
|
||
|
try {
|
||
|
request.onError(err)
|
||
|
assert(request.aborted)
|
||
|
} catch (err) {
|
||
|
client.emit('error', err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
module.exports = Client
|