Jealh's Blog

𝓣𝓱𝓲𝓼 𝓲𝓼 𝓪 𝓫𝓮𝓪𝓾𝓽𝓲𝓯𝓾𝓵 𝓼𝓾𝓫𝓽𝓲𝓽𝓵𝓮

0%

koa源码阅读

前言:koa 作为一个轻量版的 node 服务端框架,其独特的洋葱圈模型是值得一探究竟的。这篇 blog 就来学习一下其中的奥妙吧。

开始

首先,我们可以从其使用方式开始。通过引入 koa, 创建一个 app 实例,通过 use 方法,拦截请求。然后 listen 端口。

1
2
3
4
5
6
7
8
const Koa = require("koa");
const app = new Koa();

app.use(async (ctx) => {
ctx.body = "Hello World";
});

app.listen(3000);

打开源码可以发现,默认导出的是一个 Application 对象。

1
module.exports = class Application extends Emitter {};

Application 对象

constructor

在构造函数中,主要做的就是创建 middleware 数组,内置 context、request、response 对象。同时可以传入如下 option

1
2
3
4
5
6
7
8
constructor options = {
env: string, // Environment
keys: string[], // Signed cookie keys
proxy: boolean, // Trust proxy headers
subdomainOffset: number, // Subdomain offset
proxyIpHeader: string, // Proxy IP header,defaults to X-Forwarded-For
maxIpsCount: number, // Max IPs read from proxy IPheader, default to 0 (means infinity)
}

支持动态设定参数。因为在构造函数中会把 option 的属性赋值到属性上 this.proxy = options.proxy || false

1
2
3
const Koa = require("koa");
const app = new Koa({ proxy: true });
app.proxy = false; // 动态设置参数

设置完一些配置后,会设置内置的 middleware、context、response、request 属性。

1
2
3
4
5
this.middleware = [];
// context request response 为 koa的内置对象,分别在context.js request.js response.js 文件中
this.context = Object.create(context);
this.request = Object.create(request);
this.response = Object.create(response);

use

use 方法是将提供的 middleware 添加到 this.middleware 里面,需要注意的是,use 接受的参数必须是函数。

在最上边,我们添加了一个 middleware。

1
2
3
async (ctx) => {
ctx.body = "Hello World";
};

listen

listen 是 http.createServer(app.callback()).listen(3000) 的语法糖。所以,其内部就是创建一个 http.Server, 然后调用调用 listen 方法。

1
2
3
4
5
listen (...args) {
debug('listen')
const server = http.createServer(this.callback())
return server.listen(...args)
}

callback

这个方法在 listen 方法中调用。返回一个 handleRequest 函数,这个函数的参数就是 createServer 里的 http.IncomingMessage, http.ServerResponse对象。通过这两个对象, koa 会创建自己的 context 对象。

1
2
3
4
5
6
7
8
9
10
11
12
callback () {
const fn = compose(this.middleware)

if (!this.listenerCount('error')) this.on('error', this.onerror)

const handleRequest = (req, res) => {
const ctx = this.createContext(req, res)
return this.handleRequest(ctx, fn)
}

return handleRequest
}

compose 这个方法使用的是 koa-compose 包,其内部有一丢丢复杂。我们可以先看 handleRequest

handleRequest

这是实际处理请求的函数。传入的是 context, 以及 compose 函数返回的一个 fnMiddleware。
函数里面就通过 context 获取原始的 http.IncomingMessage 对象,然后先设置
statusCode = 404,(这里显式地设置 404,想想后面在哪设置 200 呢?提示:getter/setter) 最后通过 respond 函数

1
2
3
4
5
6
7
8
handleRequest (ctx, fnMiddleware) {
const res = ctx.res
res.statusCode = 404
const onerror = err => ctx.onerror(err)
const handleResponse = () => respond(ctx)
onFinished(res, onerror)
return fnMiddleware(ctx).then(handleResponse).catch(onerror)
}

从这里我们可以看到 compose 函数返回的是一个函数,并且会接受 context 参数。

compose

从上面,我们知道了 compose 返回的是一个函数,并且会接受 context 参数,那下面就来一睹全貌吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
function compose(middleware) {
if (!Array.isArray(middleware))
throw new TypeError("Middleware stack must be an array!");
for (const fn of middleware) {
if (typeof fn !== "function")
throw new TypeError("Middleware must be composed of functions!");
}

/**
* @param {Object} context
* @return {Promise}
* @api public
* 这里会接受 context,而且还可以接受一个 next
*/
return function (context, next) {
// last called middleware #
let index = -1;
return dispatch(0);
function dispatch(i) {
if (i <= index)
return Promise.reject(new Error("next() called multiple times"));
index = i;
let fn = middleware[i];
if (i === middleware.length) fn = next;
if (!fn) return Promise.resolve();
try {
return Promise.resolve(fn(context, dispatch.bind(null, i + 1)));
} catch (err) {
return Promise.reject(err);
}
}
};
}

光看代码是有点抽象的,最好的方法就是模拟一遍,我们就以最开始的代码为例。

  1. 首先 middleware 数组是这样的,只有一个元素,那我们开始执行 compose。
1
2
3
4
5
[
async (ctx) => {
ctx.body = "Hello World";
},
];
  1. compose 单单只是返回一个函数 function (context, next) {...},只有在 handleRequest 中才会执行。并且这里只传入了 context, next = undefined。
    即是:
1
fnMiddleware(ctx).then(handleResponse).catch(onerror);
  1. 进入 fnMiddleware 函数里,会执行 dispatch(0)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
function dispatch(i) {
if (i <= index)
// i = 0, index = -1
return Promise.reject(new Error("next() called multiple times"));
index = i;
let fn = middleware[i];
if (i === middleware.length) fn = next;
if (!fn) return Promise.resolve();
try {
return Promise.resolve(fn(context, dispatch.bind(null, i + 1)));
} catch (err) {
return Promise.reject(err);
}
}

这里就会执行, 也就是我们自己的中间件,从上面可以看到,我们自己写的中间件函数会默认传入两个参数 fn(context, dispatch.bind(null, i + 1)), context 与 dispatch.bind(null, i+1)。这里的 dispatch 就是我们中间件里面的 next。

从这里我们可以看到,如果在我们写的中间件里面不执行 next 的话,那就只会执行第一次 use 添加的函数了。

1
2
3
async (ctx) => {
ctx.body = "Hello World";
},
  1. 再来看看多个 middleware 的情况。
1
2
3
4
5
6
7
8
9
[
async (ctx, next) => {
await next();
ctx.body = "Hello World";
},
async (ctx) => {
console.log("2");
},
];

还是执行到 fn(context, dispatch.bind(null, i + 1)),这里 fn 就是第一个中间件,在第一个中间件里,我们执行了 next, 也就是 dispatch(1)。在 dispatch(1) 里,我们又会执行到 fn(context, dispatch.bind(null, i + 1)) 这里,只不过,这里的 fn 是 middleware[1], 也就是 console.log(“2”)这个中间件。

在第二个中间件里我们没有执行 next 函数了,这样就回溯到了第一个中间件 ctx.body = "hello world" 这里了, 这里执行完后,就会来到 handleRequest 函数 fnMiddleware(ctx).then(handleResponse) 然后执行 handleResponse -> respond。

如果我们在第二个中间件中继续执行 next 呢。这里也就是 dispatch(2), 会进入 if (i === middleware.length) fn = next; 这里,这里的 next 是最开始 fnMiddleware(ctx) 传入的,显然是 undefined, 然后走 if (!fn) return Promise.resolve(); 这个逻辑,结束递归。

compose 的限制

从上面的例子在扩展下,如果我们执行到 dispatch(2) 再去执行一下 next, 会怎样呢?

别忘了,我们还有一个分支没有走到呢!

1
2
if (i <= index)
return Promise.reject(new Error("next() called multiple times"));

在 dispatch(2) 后,再去执行一次 next, 相当于再执行一次 dispatch(2)。为什么这么说呢,因为在最后一个中间件里面,我们的 next 函数就是 fnMiddleware 里的第二个参数,即 undefined 了,这里走的逻辑是 if (!fn) return Promise.resolve();, 直接 resolve 掉了。并没有执行 dispatch.bind(null, i + 1) 这里。然而不同的是 compose 函数里面的闭包 index。index 的含义其实是上一次的 i。

当我们在最后一个中间件中执行两次 next 时,第二次 next 就满足了 i <= index 这个条件,抛出 "next() called multiple times" 错误。

因此,这也就限制了 next 的执行次数,其实可想而知,肯定不会让你执行超过 middleware 长度的 next 了啊

respond

回到 handleRequest 里面,执行完所有的 middleware 后,就会开始执行 handleResponse 了,也就是 respond(ctx) 了。

在这个函数里面,如果设置 ctx.respond = false, 就代表要我们自己处理响应数据。一般来说都是 koa 来处理,官方也不建议自己去处理https://koajs.com/#ctx-respond

然后就是处理一些比如:

  • 响应 body 需要为空的状态码(304 缓存、204 无内容)就清空 body
  • HEAD 请求响应标头未发送时并且 response 拥有 Content-Length 时设置 ctx 的 length 属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
function respond(ctx) {
// allow bypassing koa
if (ctx.respond === false) return;

if (!ctx.writable) return;

const res = ctx.res;
let body = ctx.body;
const code = ctx.status;

// ignore body
if (statuses.empty[code]) {
// strip headers
ctx.body = null;
return res.end();
}

if (ctx.method === "HEAD") {
if (!res.headersSent && !ctx.response.has("Content-Length")) {
const { length } = ctx.response;
if (Number.isInteger(length)) ctx.length = length;
}
return res.end();
}

// status body
if (body == null) {
if (ctx.response._explicitNullBody) {
ctx.response.remove("Content-Type");
ctx.response.remove("Transfer-Encoding");
ctx.length = 0;
return res.end();
}
if (ctx.req.httpVersionMajor >= 2) {
body = String(code);
} else {
body = ctx.message || String(code);
}
if (!res.headersSent) {
ctx.type = "text";
ctx.length = Buffer.byteLength(body);
}
return res.end(body);
}

// responses
if (Buffer.isBuffer(body)) return res.end(body);
if (typeof body === "string") return res.end(body);
if (body instanceof Stream) return body.pipe(res);

// body: json
body = JSON.stringify(body);
if (!res.headersSent) {
ctx.length = Buffer.byteLength(body);
}
res.end(body);
}

到此,一条经过中间件的请求响应过程就结束了,依赖中间件,可以实现其他强大的功能,如:路由。下面就介绍下 koa 中的内置对象吧。

Context 对象

context 是一个比较重要的对象,可以说是贯穿全文的,其创建实例位于 handleRequest 中调用 createContext(req,res)。其实就是存储一些上下文信息,比如 node 原始的 http.IncomingMessage, http.ServerResponse 对象,koa 内置的 Application.BaseRequest, Application.BaseResponse 对象等等。

先看一下大体结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{
app: Application,
req: IncomingMessage,
res: ServerResponse,
state: {},
originalUrl:IncomingMessage.url,
request: {
prototype: Application.BaseRequest
},
response: {
prototype: Application.BaseResponse
},
prototype: {
prototype: {
inspect: Function,
toJSON: Function,
assert: httpAssert,
throw: Function,
onerror: Function,
get cookie: Function,
set cookie: Function,
}
}
}

以及通过 delegates, 一个代理属性的库 代理的属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* Response delegation.
*/

delegate(proto, "response")
.method("attachment")
.method("redirect")
.method("remove")
.method("vary")
.method("has")
.method("set")
.method("append")
.method("flushHeaders")
.access("status")
.access("message")
.access("body")
.access("length")
.access("type")
.access("lastModified")
.access("etag")
.getter("headerSent")
.getter("writable");

/**
* Request delegation.
*/

delegate(proto, "request")
.method("acceptsLanguages")
.method("acceptsEncodings")
.method("acceptsCharsets")
.method("accepts")
.method("get")
.method("is")
.access("querystring")
.access("idempotent")
.access("socket")
.access("search")
.access("method")
.access("query")
.access("path")
.access("url")
.access("accept")
.getter("origin")
.getter("href")
.getter("subdomains")
.getter("protocol")
.getter("host")
.getter("hostname")
.getter("URL")
.getter("header")
.getter("headers")
.getter("secure")
.getter("stale")
.getter("fresh")
.getter("ips")
.getter("ip");

之前 statusCode = 404, 而后面返回的确实 200,这其实是在 ctx.body = xxx, 时设置了状态码,而 ctx.body 是在 Application.BaseResponse 对象中设置的 getter, setter 属性。

Application.BaseResponse 对象

通过对原生 http.ServerResponse 进行代理,设置响应相关的配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
{
prototype: {
get socket: http.ServerResponse.socket,
get header: http.ServerResponse.getHeaders | http.IncomingMessage._headers,
get headers: this.header,
get status: http.IncomingMessage.statusCode,
set status: Function, // 1. 根据状态码设置 body 2. 设置http.IncomingMessage.statusCode
get message: http.IncomingMessage.statusMessage | statuses[this.status],
set message: Function, // 设置 http.IncomingMessage.statusMessage
get body: this._body,
set body: Function,// 设置 _body 有 stream、string、json等,以及一些响应头
get length: http.ServerResponse.getHeader('Content-Length') | Buffer.byteLength(JSON.stringify(body)),
set length: http.ServerResponse.setHeader('Content-Length'),
get headerSent: http.ServerResponse.headersSent, // 标记响应头是否已发送
vary: Function, // 设置 vary header
redirect: Function, // 设置重定向相关的如302, Location 头
attachment: Function, // Content-Disposition 文件下载相关
set type: Function, // Content-type
get type: Function,
set lastModified: Function, // Last-Modified
get etag: Function,
set etag: Function,
has: Function, // 判断响应头是否有指定字段
set: Function, // 设置响应头是指定字段
append: Function, // 追加响应头某个字段
remove: Function, // 一处某个响应头字段
writable: http.ServerResponse.socket.writable | http.ServerResponse.writableEnded/finished,
toJSON: Function,
flushHeaders: http.ServerResponse.flushHeaders,
}
}

Application.BaseRequest 对象

与 BaseResponse 对象类似,这里是对 http.IncomingMessage 进行的一个中间代理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
{
prototype: {
get header: http.IncomingMessage.headers,
set header: http.IncomingMessage.headers,
get headers: http.IncomingMessage.headers,
set headers: http.IncomingMessage.headers,
get url: http.IncomingMessage.url,
set url: http.IncomingMessage.url,
get origin: `${this.protocol}://${this.host}`,
get href: this.originalUrl | this.origin + this.originalUrl,
get method: http.IncomingMessage.method,
set method: http.IncomingMessage.method,
get path: http.IncomingMessage.pathname,
set path: http.IncomingMessage.url,
get query: String, // 返回 queryString
set query: String, // 设置 this.querystring
get querystring: Function, // 返回 http.IncomingMessage.query || ''
set querystring: Function, // 设置 this.url
get search: `?${this.querystring}`,
set search: this.querystring = str,
get host: String, // 请求头的 host, 支持 x-Forwarded-Host
get hostname: String, // host 不带端口
get URL: URL,
get fresh: Function, // 判断客户端是否过期
get stale: !this.fresh,
get idempotent: // 检查请求是否是幂等的
get socket: http.IncomingMessage.socket,
get charset: String, // 获取 content-type 中的 charset
get length: Content-Length,
get protocol: String, // 返回协议字符串“http”或“https”
get secure: Boolean, // this.protocol === "https"
get ips: Array, // 含有 app.proxy = true 时
get ip: String, // this.socket.remoteAddress || this.ips[0]
get subdomains: String[], // 返回子域[]
get accept: Object, // Get accept object.
set accept: Function, // Set this._accept
accepts: Function,
acceptsEncodings: Function, // Return accepted encodings or best fit based on `encodings`
acceptsCharsets: Function, // Return accepted charsets or best fit based on `charsets`
acceptsLanguages: Function, // Return accepted languages or best fit based on `langs`
is: Function, // 检查传入请求是否包含“Content Type”标头字段,以及是否包含任何给定的mime“Type”。如果没有请求正文,则返回“null”。如果没有内容类型,则返回“false”。否则,它将返回第一个匹配的“类型”。
get type: String, // Content-type
get: Function, // 获取请求头的某个字段
toJSON: Function,
}
}

Reference

https://www.npmjs.com/package/statuses
https://www.npmjs.com/package/vary
https://www.npmjs.com/package/delegates
https://www.npmjs.com/package/content-type

-------------本文结束感谢您的阅读-------------

欢迎关注我的其它发布渠道