更新于 

第四章 异步编程

函数式编程

高阶函数

即将函数作为参数或返回值的函数

1
2
3
4
5
function foo(x){
return function(){
return x;
}
}

高阶函数是回调函数深层嵌套的基础

偏函数

偏函数用于创建一系列具有相同特征的函数,
即所谓的工厂函数。

偏函数案例1

重复实现类型判断函数:

1
2
3
4
5
6
let isString = function(obj){
return toString.call(obj) == '[object Stirng]'
}
let isFunction = function(obj){
return toString.call(obj) == '[object Function]'
}

引入类型判断工厂函数:

1
2
3
4
5
let isType = function(type){
return function(obj){
return toString.call(obj) == `[object ${type}]`;
}
}
偏函数案例2

源自著名类库Underscoreafter() 方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 生成一个需要调用多次才能执行的函数
let _after = function(time,func){
if(time<=0) return func()
return function(){
if(--time <= 0){
return func.apply(this,arguments)
}
}
}
let after_hello = _after(3,function(){console.log('hello')})
after_hello() // -
after_hello() // -
after_hello() // hello

异步编程的优势与难点

实现异步的一般方式即事件循环线程池机制,
但还有一种方式:

通过C/C++调用操作系统底层接口,手动实现异步I/O

优势

Node与云计算

非阻塞I/O可以使CPU与I/O不用相互等待,
这种组织资源的方式符合分布式的需要

异步I/O调用示意图
异步I/O调用示意图
同步I/O调用示意图
同步I/O调用示意图
I/O密集与CPU密集

事件循环中,JavaScript和I/O线程池分饰两角:

  • JavaScript(政客):分配任务+处理结果
  • I/O线程池(公务员):执行任务

最典型的应用即UI编程
因此Node更适合I/O密集型场景

Node在CPU密集型应用的优化

V8的优化使得JavaScript本身的性能就很出色,
但Node在CPU密集型上的性能还需要考虑JavaScript代码组织的方式,
简单来说:只要CPU计算不影响I/O的调度,就没有问题。

建议

CPU耗用不超过10ms,
可以将大量的计算分解为小量计算,然后通过setImmediate调度

难点

无法通过try/catch操作抓住异步I/O回调函数的错误

1
2
3
4
5
6
7
8
9
10
11
const async = function (callback) {
process.nextTick(callback)
}
const err = function () {
throw new Error('Catch me if you can')
}
try {
async(err)
} catch (e) {
console.log('catch you!') // catch无效
}

为了捕捉回调函数的异常,出现一种约定:

将异常作为回调函数的第一个实参

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
function async(callback) { 
process.nextTick(function () {
let results = 'hello'
try {
// 处理过程存在错误
throw new Error('catch me if you can')
} catch (e) {
callback(e) // 返回错误
return
}
callback(null,results)
})
}

function callback(err, results) {
if (err) {
console.log('catch you!')
} else {
console.log(results)
}
}

async(callback)

当多个操作之间存在依赖关系时,就会出现下面这种结构:

1
2
3
4
5
6
7
8
// 先获取模板
fs.readFile(template_path, 'utf_8', function(err, template){
db.query(sql, function(err, data){ // 再获取数据
110n.get(function(err,resources){ // 最终获取资源

})
})
})

JavaScript无法实现线程沉睡功能
下面是一种相当糟糕的模拟sleep的方式:

1
2
3
let start = new Date()
// CPU:这什么失眠操作?
while(new Date() - start<1000){}

当提到JavaScript的单线程执行时,

  • 浏览器层面指的是
    • UI渲染和JavaScript执行用的是同一个线程
  • Node层面则指
    • Node进程无法充分利用多核CPU执行
Web Workers

浏览器端提出的Web Workers具有以下几种作用:

  • 将JavaScript执行与UI渲染分离
    • 使用CPU减少阻塞UI渲染问题
    • 但并没有优化UI渲染效率
  • 可利用多核CPU进行大量计算
  • 利用消息机制的理想模型
    Web Workers的消息机制
    Web Workers的消息机制

Web Workers由于浏览器标准原因并没有广泛推广开来,
但Node中借鉴了Web Workers的模式:

  • child_process作为基础应用
  • cluster模块为更深层次应用

异步编程风格对开发者的副作用不少,如:

  • 嵌套回调
  • 业务分散

但通过良好的流程控制,还是能够将逻辑梳理成顺序形式

异步编程解决方案

事件发布/订阅模式

events
1
2
3
4
5
6
7
const EventEmitter = require('node:events')
class MyEmitter extends EventEmitter{}
const myEmitter = new MyEmitter()
myEmitter.on('event',()=>{ //添加监听器
console.log('event occured!')
})
myEmitter.emit('event') // event occured!

emit调用的回调函数是异步触发

1
2
3
4
5
const emit = new EventEmitter() //事件发布者
const eventName = 'event' // 事件名称
emit.on(eventName, 侦听器1)
emit.on(eventName, 侦听器2)
emit.on(eventName, 侦听器3)
事件发布/订阅模式的几个主要作用

一个事件能够绑定多个侦听器,
事件发布者无需关注订阅侦听器如何实现业务逻辑。

事件的设计关乎外部调用组件时是否优雅

钩子(hook)机制即导出内部数据或状态给外部调用者,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
const http = require('http')
const options = {
host: '127.0.0.1',
port: 8080,
path: '/anypath',
method: 'POST'
}
const req = http.require(options, function (res) {
console.log(`STATUS: ${res.statusCode}`)
console.log(`HEADERS: ${JSON.stringify(res.headers)}`)
res.setEncoding('utf8')
// 获取数据钩子
res.on('data', function (chunk) {
console.log(`BODY: ${chunk}`)
})
// 结束连接钩子
res.on('end',function(){})
// 报错钩子
res.on('error', function (e) {
console.log(`problem with request: ${e.message}`)
})
req.write('data\n')
req.write('data\n')
req.end()
})
事件发布/订阅机制的健壮性

一个事件添加了超过10条侦听器会得到一条警告:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* MaxListenersExceededWarning:
* Possible EventEmitter memory leak detected.
* 11 triggers listeners added to [EventEmitter].
* Use emitter.setMaxListeners() to increase limit
*/
const EventEmitter = require('node:events')
const emitter = new EventEmitter()
for (let i = 0; i < 11; i++){
emitter.on('event',function(){})
}
emitter.emit('event')

这是为了防止内存泄漏而设置的警告,
可以通过调用setMaxListeners去掉限制:

1
2
3
4
5
const emitter = new EventEmitter()
emitter.setMaxListeners(0) //
for (let i = 0; i < 11; i++){
emitter.on('event',function(){})
}
error事件

如果错误触发了error事件,EventEmitter会对error事件进行检测:

  • 注册过error事件,将交由侦听器处理
  • 未注册过,作为异常抛出
补充点
继承events模块

Node的util模块中封装了继承的方法inherits

1
2
3
4
5
6
7
8
9
10
11
12
const events = require('events')
const util = require('util')
function Stream() {
events.EventEmitter.call(this)
}
// Stream继承了EventEmitter
util.inherits(Stream, events.EventEmitter)
const emitter = new Stream()
emitter.on('event', function () {
console.log('hello world')
})
emitter.emit('event') // hello world
利用事件队列解决雪崩问题

雪崩的一种典型场景:用户重试导致系统负载升高,集中到服务器端即重复事件响应

所谓雪崩,就是在高访问量、大并发量的情况下缓存失效的情景,
如下是一次sql查询的调用

1
2
3
4
5
let select = function (callback) {
db.select('SQL', function (results) {
callback(results)
})
}

这时如果同时出现下面两种情况就会出现雪崩:

  • 站点刚启动,缓存中不存在数据
  • 访问量巨大,同一条SQL发送到数据库中重复查询

因此可以引入状态锁

1
2
3
4
5
6
7
8
9
10
let _status = 'ready' // 状态锁
const select = function (callback) {
if (_status === 'ready') {
_status = 'pending' //暂时拒绝受理其他请求
db.select('SQL', function (results) {
_status = 'ready' //受理结束后将状态重新修改为ready
callback(results)
})
}
}

但只有第一个发送的请求能获取到数据,其他的请求都被pending拒之门外了

解决方法:once

通过once添加的侦听器只会执行一次

1
2
3
4
5
6
7
8
9
10
11
12
13
const events = require('events')
const proxy = new events.EventEmitter()
let status = 'ready'
function selected(callback) {
proxy.once('select', callback)
if (status === 'ready') {
status = 'pending'
db.select("SQL", function (results) {
proxy.emit('selected', results)
status = 'ready'
})
}
}

Gearman异步应用框架也能实现这种效果

多异步之间的协作方案

一个业务逻辑需要依赖多个通过回调或事件传递的结果时,
就可能出现嵌套过深的问题:

1
2
3
4
5
6
7
8
9
10
function handle(){
// 有些回调函数看起来是并行执行,实际完全是串行
onehandler(()=>{
twohandler(()=>{
threehandler(()=>{
// 别问,问就是回调艺术
})
})
})
}
案例:回调嵌套过深的避免

渲染页面需要异步请求三类资源:

  • 模板
  • 数据
  • 本地化资源
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
let count = 0 // 哨兵变量
let results = {}
function done(key, value) {
count++
results[key] = value
if (count >= 3) {
render(results)
}
}
// 读取模板
fs.readFile(template_path, "utf8", (err, template) => {
done('template', template)
})
// 读取数据
db.query(sql, (err, data) => {
done('data', data)
})
// 读取本地化资源
l10n.get((err, resources) => {
done('resources', resources)
})

哨兵变量:借助一个第三方函数/变量来处理异步协作的结果

偏函数+哨兵变量
1
2
3
4
5
6
7
8
9
10
11
12
function after(time, callback) {
let result = {}
let count = 0
return function (key, value) {
result[key] = value
count++
if (count >= time) {
callback(result)
}
}
}
const done = after(3, render)
事件与侦听器的关系

一对多 通过订阅模式实现侦听器发散

1
2
emitter.on('event',listener1)
emitter.on('event',listener2)

多对一 通过偏函数实现事件收敛

1
2
3
const done = after(times, callback)
emitter.on('event1', done)
emitter.on('event2', done)
EventProxy的原理
EventProxy.all

EventProxy.all(e1,e2,…,listener)
listener参数顺序订阅组合事件列表的参数一一对应
当每个事件都被触发之后,侦听器才会执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
const EventProxy = require('./node_modules/eventproxy/index')
const proxy = new EventProxy()
proxy.all(
'template', 'data', 'resources',
function (template, data, resources) {
console.log(...arguments) // template data resources
}
)
proxy.on('template', res => res)
proxy.on('data', res => res)
proxy.on('resources', res => res)
proxy.emit('template','template')
proxy.emit('data','data')
proxy.emit('resources','resources')
EventProxy.tail

EventProxy.tail与all的区别在于:
组合事件中某个事件被再次触发
侦听器会使用最新的数据继续执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const EventProxy = require('./node_modules/eventproxy/index')
const proxy = new EventProxy()
proxy.on('eventA', res => res)
proxy.on('eventB', res => res)

proxy.all('eventA', 'eventB', (a, b) => {
console.log(`all: ${a} ${b}`)
})
proxy.tail('eventA', 'eventB', (a, b) => {
console.log(`tall: ${a} ${b}`)
})

proxy.emit('eventA', 'AAA')
proxy.emit('eventB', 'BBB')
proxy.emit('eventA', 'AAAAAAA')

// output:
// all: AAA BBB
// tail: AAA BBB
// tail: AAAAAAA BBB

EventProxy.after

EventProxy.after(event, time, listener)
listener会在event事件触发time次后执行

1
2
3
4
5
6
7
8
9
const EventProxy = require('./node_modules/eventproxy/index')
const proxy = new EventProxy()
proxy.after('event', 3, (data) => {
console.log(data) // ['AAA', 'BBB', 'CCC']
})
proxy.on('event', res => res)
proxy.emit('event', 'AAA')
proxy.emit('event', 'BBB')
proxy.emit('event', 'CCC')
EventProxy基本原理

EventProxy来自于BackboneJs的事件模块

EventProxy的异常处理

手动捕捉每一个异步请求的异常非常麻烦:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const EventProxy = require('./node_modules/eventproxy/index')
const ep = new EventProxy()
ep.all('tpl', 'data', function (tpl, data) {
callback(null, {
template: tpl,
data: data
})
})
// 手动绑定异常处理
ep.bind('error', function (err) {
ep.unbind()
callback(err)
})
fs.readFile('template.tpl', 'utf-8', function (err, content) {
if (err) {return ep.emit('error',err)}
ep.emit('tpl', content)
})
db.get('some sql', function (err, result) {
if (err) {return ep.emit('error', err)}
ep.emit('data',result)
})
fail && done

EventProxy.fail 绑定错误处理函数
EventProxy.done 进行异常处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
function callback(err, res) {
if (err) {
console.log(err)
return
}
console.log(res)
}
const EventProxy = require('./node_modules/eventproxy/index')
const ep = new EventProxy()
ep.all('eventA', 'eventB', function (returnA, returnB) {
callback(null, {
returnA,
returnB
})
})
ep.fail(callback)
ep.on('eventA', res => res)
ep.on('eventB', res => res)
ep.done('eventA')(null, 'AAA')
ep.done('eventB')('eventB-error', 'BBB') // eventB-error
fail
1
ep.fail(callback)

等同于

1
2
3
ep.fail(function(err){
callback(err)
})

等同于

1
2
3
4
ep.bind('error', function(err){
ep.unbind()
callback(err)
})
done

done方法实际上是对事件触发的一种包装

ep.done(eventName)
1
ep.done('event')

等同于

1
2
3
4
5
6
function (err, content){
if(err){
return ep.emit('error', err)
}
ep.emit('event', content)
}
ep.done(contentFun)

done中也只传一个回调函数,
但需要在函数中手动emit

1
2
3
ep.done((content)=>{
ep.emit('event',content)
})

等同于

1
2
3
4
5
6
7
8
function(err,content){
if(err){
return ep.emit('error',err)
}
(function(content){
ep.emit('event',content)
}(content))
}
ep.done(eventName, contentFun)

也可以即传入事件名称,又传入回调函数,
这里的回调函数主要是用于处理返回值的

1
2
3
4
ep.done('event',function(content){
// 对content做一些处理
return content
})

等同于

1
2
3
4
5
6
7
8
9
function(err, content){
if(err){
return ep.emit('error', err)
}
ep.emit('event',(function(content){
// 对content做一些处理
return content
})(content))
}

Promise/Deferred模式

在事件订阅模型中,事件执行后的回调函数需要在注册时就明确指定

1
emitter.emit('event',callback)

Promise/Deferred模式可以说是一种先执行异步调用延迟处理的模式,
这种模式最早出现在Dojo中:

异步Promise/Deferred模型

Promises/A对异步操作下了三个抽象定义:

  • Promise操作仅存在3种状态:
    • 未完成态
    • 完成态
    • 失败态
  • Promise状态只存在两种转化过程:
    • 未完成态 → 完成态
    • 未完成态 → 失败态
  • Promise状态一旦转变,将不能被更改
then方法

then方法实际做的事就是:将回调函数存放起来

1
then(fulfilledHandler, errorHandler, progressHandler)
  • fullfilledHandler 完成态回调方法
  • errorHandler 错误态回调方法
  • progressHandler progress事件回调方法
使用events完成then的简单实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
const EventEmitter = require('events').EventEmitter
const Promise = function () {
EventEmitter.call(this)
}
util.inherits(Promise, EventEmitter)
Promise.prototype.then = function (fullfilledHandler, errorHandler, progressHandler) {
if (typeof fullfilledHandler === 'function') {
this.once('success', fullfilledHandler)
}
if (typeof errorHandler === 'function') {
this.once('error', errorHandler)
}
if (typeof progressHandler === 'function') {
this.once('progress', progressHandler)
}
return this
}

事件都注册之后,如何触发这些事件?

Deferred 延迟对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
let Deferred = function () {
this.state = 'unfilfilled'
this.promise = new Promise()
}
Deferred.prototype.resolve = function (obj) {
this.state = 'fulfilled'
this.promise.emit('success', obj)
}
Deferred.prototype.reject = function (err) {
this.state = 'failed'
this.promise.emit('error', err)
}
Deferred.prototype.progress = function (data) {
this.promise.emit('progress', data)
}
响应对象的封装

典型响应对象的封装:

1
2
3
4
5
6
7
8
9
10
res.setEncoding('utf8')
res.on('data', function (chunk) {
console.log('BODY: ' + chunk)
})
res.on('end', function () {
// Done
})
res.on('error', function (err) {
// Error
})

使用then进行封装:

1
2
3
4
5
6
7
res.then(function () {
// Done
}, function (err) {
// Error
}, function (chunk) {
console.log('BODY: ' + chunk)
})

引入Deferred对象的封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
const promisify = function (res) {
let deferred = new Deferred()
let result = ''
res.on('data', function (chunk) {
result += chunk
deferred.progress(chunk)
})
res.on('end', function () {
deferred.resolve(result)
})
res.on('error', function (err) {
deferred.reject(err)
})
return deferred.promise
}

promisify(res).then(function () {
// Done
}, function (err) {
// Error
}, function (chunk) {
// progress
console.log('BODY: ' + chunk)
})
Promise && Deferred
  • Deferred
    • 用于内部
    • 维护异步模型状态
    • 封装业务中的不变部分
  • Promise
    • 用于外部
    • 通过then方法给外部提供添加自定义逻辑入口
    • 控制业务的可变部分

Deferred与业务的简化

并不是使用Deferred都可以简化业务,
因为对于不同的业务场景,都需要封装和改造Deferred,
对于不常使用的场景,考虑到封装和改造Deferred花费的时间,并不一定真正简洁划算。

Promise与event

  • Promise
    • 高级接口
    • 一旦定义,不太容易变化,缺少灵活性
    • 方便解决典型问题
  • events
    • 低级接口
    • 方便构造更加复杂的场景
Q model
1
2
3
4
5
6
7
8
9
10
11
12
13
14
const Q = require('q')
const fs = require('fs')
const readFile = function (file, encoding) {
let deferred = Q.defer()
fs.readFile(file, encoding, deferred.makeNodeResolver())
return deferred.promise
}
readFile('hello.txt', 'utf-8').then(function (data) {
// success case
console.log(data)
}, function (err) {
// failed case
console.log(err)
})

异步编程中异常的判断和处理非常重要,
Promise能够实现:

  • 正向用例(success)和反向用例(error)的分离
  • 逻辑处理延迟(then)
Promise的多异步协作

all的简单实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Deferred.prototype.all = function (promises) {
let count = promises.length
let that = this
let results = []
promises.forEach(function (promise, i) {
promise.then(function (data) {
count--
results[i] = data
if (count === 0) {
that.resolve(resolves)
}
}, function (err) {
that.reject(err)
})
})
return this.promise
}

all的使用

1
2
3
4
5
6
7
let promise1 = readFile('hello.txt', 'utf-8')
let promise2 = readFile('world.txt', 'utf-8')
let deferred = new Deferred()
deferred.all([promise1, promise2]).then(
res => { },
err => { }
)
Promise进阶知识

Promise需要根据具体使用场景进行不同的封装,
与原生事件相比灵活性低一些,
但是更加适用于一些经典场景

前端自动化测试

在编写自动化测试时,前端程序员使用JavaScript可以减轻切换环境的麻烦,
但是使用Node与远程服务器进行指令发送时,
使用到的网络库是完全异步的(而其它语言都是同步调用的),
因此在多个异步操作之间存在逻辑关系时,会采用Deferred模式

Pyramid of Doom

作为介绍Promise队列操作的铺垫,
有必要提一下大名鼎鼎的Pyramid of Doom,即恶魔金字塔

可爱nie~~~
可爱nie~~~

为了完成一系列彼此间存在逻辑关系的异步操作,
有如下三种地狱模式可供选择:

1
2
3
4
5
6
7
8
9
obj.api1(function (value1) {
obj.api2(value1, function (value2) {
obj.api3(value2, function (value3) {
obj.api4(value3, function (value4) {
callback(value4) // Pyramid of Doom!!!
})
})
})
})
1
2
3
4
5
6
7
8
9
10
11
12
13
let handler1 = function (value1) {
obj.api2(value1, handler2)
}
let handler2 = function (value2) {
obj.api3(value2, handler3)
}
let handler3 = function (value3) {
obj.api4(value3, handler4)
}
let handler4 = function (value4) {
callback(value4)
}
obj.api1(handler1)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 保证你同事看到会口吐芬芳的代码
const events = require('events')
const emitter = new events.EventEmitter()

emitter.on('step1', function () {
obj.api1(function (value1) {
emitter.emit('step2', value1)
})
})
emitter.on('step2', function (value1) {
obj.api2(value1, function (value2) {
emitter.emit('step3', value2)
})
})
emitter.on('step3', function (value2) {
obj.api3(value2, function (value3) {
emitter.emit('step4', value3)
})
})
emitter.on('step4', function (value3) {
obj.api4(value3, function (value4) {
callback(value4)
})
})
// 同事:你在这打地鼠呢?
emitter.emit('step1')
Promise链式调用原理
1
2
3
4
5
6
7
8
9
10
11
promise()
.then(obj.api1)
.then(obj.api2)
.then(obj.api3)
.then(obj.api4)
.then(function (value4) {
// success
}, function (err) {
// error
})
.done()

改造Promise以实现链式调用(两步):

  • 准备好回调队列
  • 逐个执行回调,当回调返回新的Promise对象时:
    • 新Promise对象继承回调队列
    • Deferred对象更新为新Promise对象
      1
      2
      3
      4
      5
      6
      7
      let Promise = function(){
      this.queue = []
      this.isPromise = true
      }
      let Deferred = function(){
      this.promise = new Promise()
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      Deferred.prototype.resolve = function(obj){
      let promise = this.promise
      let handler
      while((handler = promise.queue.shift())){
      if(handler && (handler.fulfilled)){
      let ret = handler.fulfilled(obj)
      if(ret && ret.isPromise){
      ret.queue = promise.queue
      this.promise = ret
      return
      }
      }
      }
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      Deferred.prototype.reject = function (err) {
      let promise = this.promise
      let handler
      while ((handler = promise.queue.shift())) {
      if (handler && handler.error) {
      let ret = handler.error(err)
      if (ret && ret.isPromise) {
      ret.queue = queue.queue
      this.promise = ret
      return
      }
      }
      }
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      Deferred.prototype.callback = function(){
      const that = this
      return function(err, file){
      if(err){
      return that.reject(err)
      }
      return that.resolve(file)
      }
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      Promise.prototype.then = function(fulfilledHandler, errorHandler){
      let hander = {}
      if(typeof fulfilledHandler === 'function'){
      handler.fulfilled = fulfilledHandler
      }
      if(typeof errorHandler === 'function'){
      hander.error = errorHandler
      }
      this.queue.push(handler)
      return this
      }

使用链式调用改造回调地狱案例:

1
2
3
4
5
6
7
8
9
10
obj.api1(value1)
.then((value2)=>{
return obj.api2(value2)
})
.then((value3)=>{
return obj.api3(value3)
})
.then((value4)=>{
callback(value4)
})
批量将方法Promise异步化

这样如果希望能实现依次读取文件的功能,
需要对读取文件函数进行异步包装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const fs = require('fs')
const Q = require('q')
// 异步包装函数
function readFile(file, encoding) {
let defer = Q.defer()
fs.readFile(file, encoding, defer.makeNodeResolver())
return defer.promise
}
readFile('file1.txt', 'utf-8')
.then(res => {
console.log(res)
return readFile(res,'utf-8')
})
.then(res => {
console.log(res)
})

也可以构建能够对任何方法进行异步包装的偏函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const fs = require('fs')
const Q = require('q')
// 批量对方法进行Promise化
function smooth(method) {
return function () {
let deferred = Q.defer() // new Deferred()
let args = Array.prototype.slice.call(arguments, 0)
args.push(deferred.makeNodeResolver()) //deferred.callback()
method.apply(null, args)
return deferred.promise
}
}
const readFile = smooth(fs.readFile)
readFile('file1.txt', 'utf-8')
.then(res => {
console.log(res)
return readFile(res,'utf-8')
})
.then(res => {
console.log(res)
})

流程控制库

除了事件/订阅Promise/Defer模式以外,还有一些尝试用的流程控制库:

  • Connect
  • async
  • Step
  • wind
尾触发与Next

尾触发: 指需要手工调用才能持续执行后续调用的方法
next: 尾触发的常见关键词
Connect中间件: 尾触发目前应用最多的地方

1
2
3
4
5
6
7
8
9
const connect = require('connect')
const fs = require('fs')
let app = connect()
app.use(function middleware1(req, res, next) {
next()
})
app.use(function middleware2(req, res, next) {
next()
})
请求对象、响应对象、尾触发函数处理流
请求对象、响应对象、尾触发函数处理流

MiddleWare 中间件机制

中间件机制在处理网络请求时,
可以像面向切面编程一样进行过滤、验证、日志等功能,
不与业务逻辑产生耦合关联

面向切面编程 AOP

AOP Aspect Oriented Program
简单来说,就是在运行中动态将代码切入到类的指定方法、指定位置,
是对OOP编程的一种补充。

connect核心实现

app.stack存放服务器内部维护的中间件队列

1
2
3
4
5
6
7
8
9
10
11
function createServer() { 
function app(req, res) { app.handle(req, res) }
utils.merge(app, proto)
utils.merge(app, EventEmitter.prototype)
app.route = '/'
app.stack = [] // 中间件队列
for (let i = 0; i < arguments.length; ++i){
app.use(arguments[i])
}
return app
}

app.use用于将中间件放入stack队列

1
2
3
4
5
6
7
app.use = function(route, fn){
this.stack.push({
route: route,
handle: fn
})
return this
}

app.listen监听函数借助http模块实现

1
2
3
4
5
const http = require('http')
app.listen = function(){
const server = http.createServer(this)
return server.listen.apply(server, arguments)
}

app.handle,每一个监听到的网络请求都从此处开始处理

1
2
3
app.handle = function(req, res, out){
next()
}

将队列中的中间件取出并执行,
随后递归调用、持续触发

1
2
3
4
5
6
function next(err) {
// some code
// next callback
layer = stack[index++]
layer.handle(req, res, next)
}
Connect的流式处理

Connect中的尾触发一般用于拆分对网络请求的处理,
能够将串行逻辑扁平化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const connect = require('connect')
const app = connect()
app.use('/', function (req, res, next) {
// step1
next()
})
app.use('/', function (req, res, next) {
// step2
next()
})
app.use('/', function (req, res, next) {
// step3
res.end()
})
http.createServer(app).listen(3000)
async

流程控制模块async提供用于处理异步的各种协作模式

async.series 无依赖的异步串行执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const async = require('async')
const fs = require('fs')
async.series([
function (callback) {
fs.readFile('file1.txt','utf-8',(err,data)=>callback(null,data))
},
function (callback) {
fs.readFile('file2.txt','utf-8',(err,data)=>callback(null,data))
}
], function (err, results) {
if (err) {
throw err
}
console.log(results) //["file1 context","file2 context"]
})

async.series第一个参数事件数组中元素有如下结构:

1
2
3
4
(callback)=>{
// do something
callback(result)
}

这里的callback实际上与connect.next类似,
用于结束本次执行,开始下次执行,
并且其传参是用于将本次执行的结果保存起来的

手动实现异步并行执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
let counter = 2
let results = []
let hasErr = false
function done(data) {
counter -= 1
results.push(data)
if (counter == 0) {
callback(null, results)
}
}

function fail(err) {
if (!hasErr) {
hasErr = true
callback(err)
}
}

function callback(err, result) {
if (err) {
throw err
}
console.log(result)
}

fs.readFile('file1.txt', 'utf-8', (err, data) => {
if (err) {
fail(err)
return
}
done(data)
})
fs.readFile('file2.txt', 'utf-8', (err, data) => {
if (err) {
fail(err)
return
}
done(data)
})
EventProxy实现异步并行执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const EventProxy = require('eventproxy')
const fs = require('fs')
const proxy = new EventProxy()

function callback(err, res) {
if (err) throw err
console.log('result',res)
}

proxy.all('content', 'data', function (content, data) {
callback(null, [content, data])
})
proxy.fail(callback)

fs.readFile('file1.txt', 'utf-8', proxy.done('content'))
fs.readFile('file2.txt', 'utf-8', proxy.done('data'))

parallel 异步并行执行

1
2
3
4
5
6
7
8
9
10
11
12
async.parallel([
function (callback) {
fs.readFile('file1.txt','utf-8',callback)
},
function (callback) {
fs.readFile('file2.txt','utf-8',callback)
}],
function (err, res) {
if (err) { throw err }
console.log(res) // ['file1 content','file2 content']
}
)

waterfall 结果存在依赖关系的异步串行执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
async.waterfall([
function (callback) {
fs.readFile('file1.txt', 'utf-8', function (err, content) {
// err后面的参数代表传递给下一步的参数
callback(err,content)
})
},
function (arg1, callback) {
// arg1 = file2.txt
fs.readFile(arg1, 'utf-8', function (err, content) {
callback(err,content)
})
}
], function (err, res) {
if (err) throw err
console.log(res) // file content
})

auto 自动依赖处理

对于存在复杂依赖关系的业务环境,auto能够实现:
根据依赖关系自动分析,以最佳的顺序执行业务(强!!!)

1
2
3
4
5
6
7
8
9
10
let deps = {
fun1:callback=>callback(),
// fun2 依赖 fun3、fun4
fun2:['fun3','fun4',callback=>callback()],
// fun3 依赖 fun1
fun3:['fun1',callback=>callback()],
// fun4 依赖 fun1
fun4:['fun4',callback=>callback()]
}
async.auto(deps)

比如有以下一系列业务需要实现:

  • 从磁盘读取配置文件 readConfig
  • 根据配置文件连接MongoDB connectMongoDB
    • 依赖readConfig
  • 根据配置文件连接Redis connectRedis
    • 依赖readConfig
  • 编译静态文件 complieAsserts
  • 上传静态文件到CDN uploadAsserts
    • 依赖 compieAsserts
  • 启动服务器 startup
    • 依赖以上所有
EventProxy实现此场景
  • asap
    • 用于指定一个事件的回调函数尽快执行
  • assp
    • 指定一组事件的回调函数按照特定顺序执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
const EventProxy = require('eventproxy')
const proxy = new EventProxy()
proxy.asap('readtheconfig', function () {
// 读取配置文件
proxy.emit('readConfig')
}).on('readConfig', function () {
// 连接数据库
proxy.emit('connectMongoDB')
}).on('readConfig', function () {
// 连接Redis
proxy.emit('connectRedis')
}).assp('complieasserts', function () {
// 编译静态文件
proxy.emit('complieAsserts')
}).on('complieAsserts', function () {
// 上传静态文件
proxy.emit('uploadAsserts')
}).all('connecctMongoDB', 'connectRedis', 'uploadAsserts', function () {
// 启动服务器
})

生成依赖树如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
async.auto({
'readConfig': function (callback) {
// read config file
callback()
},
'connectMongoDB': ['readConfig', function (callback) {
// connect to mongodb
callback()
}],
'connectRedis': ['readConfig', function (callback) {
// connect to redis
callback()
}],
'complieAsserts': function (callback) {
// complie asserts
callback()
},
'uploadAsserts': ['complieAsserts', function (callback) {
// upload to assert
callback()
}],
'startup': [
'connectMongoDB',
'connectMongoDB',
'complieAsserts',
'uploadAsserts',
function (callback) {
// start up
callback()
}
]
})
Step

更加轻量级的流程控制库:Tim CaswellStep

1
2
// step提供的唯一接口用于串行任意数量的任务
step(task1,task2,task3)

step的this关键字实际上发挥的是next方法的作用

1
2
3
4
5
6
7
8
9
10
11
12
13
const step = require('step')
const fs = require('fs')
step(
function readFile1() {
fs.readFile('file1.txt','utf-8',(err,data)=>this(data))
},
function readFile2(param) { // param = file2.txt
fs.readFile(param,'utf-8',this)
},
function done(err, content) {
console.log(content) //file content
}
)
并行任务执行 parallel

parallel: 告诉Step需要等所有任务完成时才能进行下一个任务

注意:step只能取到异步方法传回来的前2个参数,
此方法的原理在于计数器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
let asyncCall = function (callback) {
process.nextTick(function () {
callback(null,'result1','result2')
})
}
step(
function readFile1() {
fs.readFile('file1.txt', 'utf-8', this.parallel())
fs.readFile('file2.txt', 'utf-8', this.parallel())
asyncCall(this.parallel())
},
function done() {
/**
* 0:undefined
* 1:file2.txt
* 2:file content
* 3:result1
*/
console.log(arguments)
}
)
结果分组 group

group 从流程控制上说和parallel很相似,但是会将结果打包成数组传回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
step(
function readDir() {
fs.readdir(__dirname, this)
},
function readFiles(err, results) {
if (err) throw err
let group = this.group()
results.forEach(filename => {
if (/\.txt$/.test(filename)) {
fs.readFile(__dirname+'/'+filename,'utf-8',group())
}
})
},
function showAll(err, files) {
if (err) throw err
console.dir(files) //[ 'file2.txt', 'file content', 'file3 content' ]
}
)
为什么要调用两次group?

第一次调用:告知step要并行执行

1
let group = this.group()

第二次调用:用于生成回调函数接受返回值按组存储

1
fs.readFile(__dirname+'/'+filename,'utf-8',group())
wind
Monadic 单子

monadic,是一种用于处理具有副作用的计算,
如I/O操作、异常处理和状态管理等。

wind算是对JavaScript实现的一种Monadic扩展,
为了说明wind的具体使用场景,我们先提出一个比较特殊的需求:

实现冒泡排序的动画

冒泡排序代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
function swap(a, i, j) { 
let t = a[i]
a[i] = a[j]
a[j] = t
}
function compare(a, b) {
return (a-b) > 0
}
function bubbleSort(array) {
for (let i = 0; i < array.length; i++){
for (let j = 0; j < array.length-i-1; j++){
if (compare(array[j], array[j + 1])) {
swap(array,j,j+1)
}
}
}
return array
}

如果需要添加动画效果的话,只要在swap函数中添加就可以,
但是动画效果存在延时
在延时时段中动画还没有执行完,数组中的数据就又开始变化了,
因此该如何让动画执行完之后再进行下一次的数据交换呢?

需要达成的效果
需要达成的效果
同样需要通过延时实现的动画效果
同样需要通过延时实现的动画效果
使用yield实现sleep

也可以使用ES6中的yield实现流程控制,
在每次冒泡交换之后添加yield阻塞流程,
转而花费n秒控制动画延时,
动画执行结束,再手动调用生成器函数的next方法。

核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
function * bubbleSort(array){
for(let i=0;i<array.length;i++){
for(let j=0;j<array.length-i-1;j++){
if(compare(array[j],array[j+1])){
swap(array,j,j+1)
yield 'stop' //交换后阻塞
}
}
}
}
function next(gen){ // 需要迭代调用进行流程控制
let n = gen.next()
if(n.done){
return
}
paint().then(()=>next(gen)) //异步绘制后再次调用next通过下一次yield
}

let gen = bubbleSort(arr)
next(gen)

使用wind实现此功能代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
const Wind = require('wind')

// swap进行异步包装
let swapAsync = eval(Wind.compile("async", function swap(a, i, j) {
$await(Wind.Async.sleep(500)) // 暂停500s
var t = a[i]
a[i] = a[j]
a[j] = t
console.log('repaint') //重绘
}))

function compare(a, b) {
return (a - b) > 0
}

let bubbleSort = eval(Wind.compile("async", function (array) {
for(var i = 0;i<array.length;i++){
for(var j = 0; j < array.length - i -1;j++){
if(compare(array[j],array[j+1])){
$await(swapAsync(array,j,j+1))
}
}
}
}))

bubbleSort([7,-3,10,-1,0,99]).start()
输出结果
输出结果
wind的特异之处

Wind.compile()对普通函数进行编译
再交给 eval() 执行

定义异步任务

1
eval(Wind.compile("async",function(){}))

内置对setTimeout()的封装

1
Wind.Async.sleep()
1
$await(asyncFun())

$await 实际是一个等待的 占位符
表示等待完成异步方法

任务模型

每一个异步操作都可以转换为一个任务
这也就是wind基于的任务模型

正式的任务创建方法:Task.create()/complete()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
const Wind = require('wind')
const Task = Wind.Async.Task
const fs = require('fs')

// 任务包装
let readFileAsync = function (file, encoding) {
return Task.create(function (t) {
fs.readFile(file, encoding, function (err, file) {
if (err) {
t.complete('failure', err)
} else {
t.complete('success', file)
}
})
})
}

// 异步执行
let serial = eval(Wind.compile("async", function () {
var file1 = $await(readFileAsync('file1.txt', 'utf-8'))
console.log(file1)
var file2 = $await(readFileAsync("file2.txt", 'utf-8'))
console.log(file2)
try {
var file3 = $await(readFileAsync("file4.txt",'utf-8'))
} catch (err) {
console.log(err)
}
}))

// file1 content
// file2 content
// Error
serial().start()

异步并行

直接使用 $await(asyncTask()) 的方式能实现异步任务串行执行,
实现了不阻塞CPU阻塞代码的目的

Wind实现异步任务并行执行:Task.whenAll

1
2
3
4
5
6
7
8
9
let parallel = eval(Wind.compile('async', function () {
var results = $await(Task.whenAll({
file1: readFileAsync('file1.txt', 'utf-8'),
file2: readFileAsync('file2.txt','utf-8')
}))
console.log(results)
}))
// { file1: 'file2.txt', file2: 'file content' }
parallel().start()
1
2
3
4
5
let series = eval(Wind.compile('async',function(){
// ...
$await(asyncTask())
// to do
}))

可以看到引入wind之后,异步函数实际上已经和同步调用十分近似了
因此很适合用于从已有的同步代码向Node端迁移
wind提供了两个辅助方法:

  • Wind.Async.Binding.fromCallback
  • Wind.Async.Binding.fromStandard

fromCallback 转换无异常的调用

无异常的调用通常只有一个参数返回:

1
2
3
4
5
fs.exists("/etc/passwd",function(exists){
// exists表示指定路径文件是否存在
})
// 异步包装
let existsAsync = Wind.Async.Binding.fromStandard(fs.exists)

fromStandard 转换带异常的调用

带异常的调用的第一个参数作为异常标识

1
2
3
4
fs.readFile('file1.txt',function(err,data){
// err表示异常
})
let readFileAsync = Wind.Async.Binding.fromStandard(fs.readFile)

小结

方案对比
  • 事件发布/订阅模式
    • 比较原始
  • Promise/Deferred模式
    • 异步任务模型进行抽象
    • 重点在于封装异步的调用
  • 流程控制库
    • 自由度较高
    • 重点在于回调函数的注入
    • 源码编译方案
      • streamline

异步并发控制

所谓异步并发性能的关键可以总结为这样一个问题:

怎样发起异步并发,能够实现既充分压榨底层系统的性能,同时给予其一定过载保护,防止过犹不及?

泪目,资本家竟是我自己

如果不对并发量作限制的话,连续大量的并发请求会迅速用光下层服务器的资源,
针对并发量的限制问题,这里给出了两个解决方案:

  • bagpipe
  • async.parallelLimit / async.queue

bagpipe解决方案

bagpipe解决思路
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
let allTaskList = [] // 调用缓存队列
let activeTaskList = [] // 活跃(调用发起但并未执行回调)的异步调用
let limitNumber = 100 // 并发限定量
function parallelControl(){
// 如果还有多余的任务名额,取出执行
if(allTaskList.length && activeTaskList.length<limitNumber){
let task = allTaskList.shift()
activeTaskList.push(task)
task.then((res)=>{
activeTaskList.del(task) //销毁
parallelControl() // 下一次取出
})
}else{
parallelControl()
}
}

实际使用到两个bagpipe的API:

  • push方法
  • full事件
1
2
3
4
5
6
7
8
9
const Bagpipe = require('bagpipe')
const fs = require('fs')
let bagpipe = new Bagpipe(3)
bagpipe.on('full',function(length){
console.log(`query block:[${length}]`)
})
for(let i = 0; i<5; i++){
bagpipe.push(fs.readFile,'file1.txt','utf-8',function(err,data){})
}
bagpipe的配置
  • disabled
    • 限流开关
  • refuse
    • 拒绝模式
    • 针对对实时性要求高的场景
    • 限流分批发起请求可能导致一部分请求必须等待一段时间,无法满足丢时间要求较高的数据请求
    • 并行队列一旦阻塞就返回失败报错
  • timeout
    • 用于防止执行过慢的请求长时间占用并发名额

async的解决方案

async.parallelLimit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const async = require('async')
const fs = require('fs')
async.parallelLimit([
function (callback) {
fs.readFile('file1.txt','utf-8',callback)
},
function (callback) {
fs.readFile('file2.txt','utf-8',callback)
}
], 1, function (err, results) {
if (err) {
throw err
}
//[ 'file2.txt', 'file content' ]
console.log(results)
})
1
2
3
4
5
async.parallelLimit(
[], //异步并发任务队列
limit, //并发数
callback
)
async.queue

async.parallelLimit无法动态增加并行任务,
但是async.queue支持此需求:

1
2
3
4
5
6
7
8
9
10
let q = async.queue(function (file, callback) { 
fs.readFile(file,'utf-8',callback)
}, 2)
q.drain = function () {
// done list
}
fs.readdirSync('.').forEach(function (file) {
q.push(file, function (err, data) {
})
})