第四章 异步编程
函数式编程 高阶函数
即将函数 作为参数或返回值的函数
1 2 3 4 5 function foo (x ){ return function ( ){ return x; } }
高阶函数是回调函数 和深层嵌套 的基础
偏函数
偏函数用于创建一系列具有相同特征的函数, 即所谓的工厂函数。
偏函数案例1
重复实现类型判断函数:
1 2 3 4 5 6 let isString = function (obj ){ return toString.call (obj) == '[object Stirng]' } let isFunction = function (obj ){ return toString.call (obj) == '[object Function]' }
引入类型判断工厂函数:
1 2 3 4 5 let isType = function (type ){ return function (obj ){ return toString.call (obj) == `[object ${type} ]` ; } }
偏函数案例2
源自著名类库Underscore 的 after() 方法的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 let _after = function (time,func ){ if (time<=0 ) return func () return function ( ){ if (--time <= 0 ){ return func.apply (this ,arguments ) } } } let after_hello = _after (3 ,function ( ){console .log ('hello' )})after_hello () after_hello () after_hello ()
异步编程的优势与难点 实现异步的一般方式即事件循环 和线程池 机制, 但还有一种方式:
通过C/C++调用操作系统底层接口,手动实现异步I/O
优势 Node与云计算
非阻塞I/O可以使CPU与I/O不用相互等待, 这种组织资源的方式符合分布式 和云 的需要
I/O密集与CPU密集
事件循环中,JavaScript和I/O线程池分饰两角:
JavaScript(政客):分配任务+处理结果 I/O线程池(公务员):执行任务 最典型的应用即UI编程 , 因此Node更适合I/O密集型 场景
Node在CPU密集型应用的优化
V8的优化使得JavaScript本身的性能就很出色, 但Node在CPU密集型上的性能还需要考虑JavaScript代码组织的方式, 简单来说:只要CPU计算不影响I/O的调度,就没有问题。
建议
CPU耗用不超过10ms, 可以将大量的计算分解为小量计算,然后通过setImmediate 调度
难点 无法通过try/catch 操作抓住异步I/O回调函数的错误
1 2 3 4 5 6 7 8 9 10 11 const async = function (callback ) { process.nextTick (callback) } const err = function ( ) { throw new Error ('Catch me if you can' ) } try { async (err) } catch (e) { console .log ('catch you!' ) }
为了捕捉回调函数的异常,出现一种约定:
将异常作为回调函数的第一个实参
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 function async (callback ) { process.nextTick (function ( ) { let results = 'hello' try { throw new Error ('catch me if you can' ) } catch (e) { callback (e) return } callback (null ,results) }) } function callback (err, results ) { if (err) { console .log ('catch you!' ) } else { console .log (results) } } async (callback)
当多个操作之间存在依赖关系时,就会出现下面这种结构:
1 2 3 4 5 6 7 8 fs.readFile (template_path, 'utf_8' , function (err, template ){ db.query (sql, function (err, data ){ 110n .get (function (err,resources ){ }) }) })
JavaScript无法实现线程沉睡功能 , 下面是一种相当糟糕的模拟sleep的方式:
1 2 3 let start = new Date ()while (new Date () - start<1000 ){}
当提到JavaScript的单线程执行时,
浏览器层面指的是
UI渲染和JavaScript执行用的是同一个线程
Node层面则指
Web Workers
浏览器端提出的Web Workers 具有以下几种作用:
将JavaScript执行与UI渲染分离 使用CPU减少阻塞UI渲染 问题 但并没有优化UI渲染效率 可利用多核CPU 进行大量计算 利用消息机制 的理想模型 Web Workers由于浏览器标准原因并没有广泛推广开来, 但Node中借鉴了Web Workers的模式:
child_process 作为基础应用cluster模块 为更深层次应用异步编程风格对开发者的副作用不少,如:
但通过良好的流程控制 ,还是能够将逻辑梳理成顺序形式
异步编程解决方案 事件发布/订阅模式 events
1 2 3 4 5 6 7 const EventEmitter = require ('node:events' )class MyEmitter extends EventEmitter {}const myEmitter = new MyEmitter ()myEmitter.on ('event' ,()=> { console .log ('event occured!' ) }) myEmitter.emit ('event' )
emit调用的回调函数是异步触发 的
1 2 3 4 5 const emit = new EventEmitter () const eventName = 'event' emit.on (eventName, 侦听器1 ) emit.on (eventName, 侦听器2 ) emit.on (eventName, 侦听器3 )
事件发布/订阅模式的几个主要作用
一个事件能够绑定多个侦听器, 事件发布者无需关注订阅侦听器如何实现业务逻辑。
事件的设计关乎外部调用组件时是否优雅
钩子(hook)机制即导出内部数据或状态 给外部调用者,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 const http = require ('http' )const options = { host : '127.0.0.1' , port : 8080 , path : '/anypath' , method : 'POST' } const req = http.require (options, function (res ) { console .log (`STATUS: ${res.statusCode} ` ) console .log (`HEADERS: ${JSON .stringify(res.headers)} ` ) res.setEncoding ('utf8' ) res.on ('data' , function (chunk ) { console .log (`BODY: ${chunk} ` ) }) res.on ('end' ,function ( ){}) res.on ('error' , function (e ) { console .log (`problem with request: ${e.message} ` ) }) req.write ('data\n' ) req.write ('data\n' ) req.end () })
事件发布/订阅机制的健壮性
一个事件添加了超过10条侦听器 会得到一条警告:
1 2 3 4 5 6 7 8 9 10 11 12 const EventEmitter = require ('node:events' )const emitter = new EventEmitter ()for (let i = 0 ; i < 11 ; i++){ emitter.on ('event' ,function ( ){}) } emitter.emit ('event' )
这是为了防止内存泄漏而设置的警告, 可以通过调用setMaxListeners去掉限制:
1 2 3 4 5 const emitter = new EventEmitter ()emitter.setMaxListeners (0 ) for (let i = 0 ; i < 11 ; i++){ emitter.on ('event' ,function ( ){}) }
error事件
如果错误触发了error事件,EventEmitter会对error事件进行检测:
注册过error事件,将交由侦听器处理 未注册过,作为异常抛出
补充点
继承events模块 Node的util 模块中封装了继承的方法inherits
1 2 3 4 5 6 7 8 9 10 11 12 const events = require ('events' )const util = require ('util' )function Stream ( ) { events.EventEmitter .call (this ) } util.inherits (Stream , events.EventEmitter ) const emitter = new Stream ()emitter.on ('event' , function ( ) { console .log ('hello world' ) }) emitter.emit ('event' )
利用事件队列解决雪崩问题 雪崩的一种典型场景:用户重试导致系统负载升高,集中到服务器端即重复事件响应
所谓雪崩,就是在高访问量、大并发量 的情况下缓存失效的情景, 如下是一次sql查询的调用
1 2 3 4 5 let select = function (callback ) { db.select ('SQL' , function (results ) { callback (results) }) }
这时如果同时出现下面两种情况就会出现雪崩:
站点刚启动,缓存中不存在数据 访问量巨大,同一条SQL发送到数据库中重复查询 因此可以引入状态锁 :
1 2 3 4 5 6 7 8 9 10 let _status = 'ready' const select = function (callback ) { if (_status === 'ready' ) { _status = 'pending' db.select ('SQL' , function (results ) { _status = 'ready' callback (results) }) } }
但只有第一个发送的请求能获取到数据,其他的请求都被pending拒之门外了
解决方法:once
通过once添加的侦听器只会执行一次
1 2 3 4 5 6 7 8 9 10 11 12 13 const events = require ('events' )const proxy = new events.EventEmitter ()let status = 'ready' function selected (callback ) { proxy.once ('select' , callback) if (status === 'ready' ) { status = 'pending' db.select ("SQL" , function (results ) { proxy.emit ('selected' , results) status = 'ready' }) } }
Gearman异步应用框架 也能实现这种效果
多异步之间的协作方案 当一个业务逻辑 需要依赖多个通过回调或事件 传递的结果时, 就可能出现嵌套过深的问题:
1 2 3 4 5 6 7 8 9 10 function handle ( ){ onehandler (()=> { twohandler (()=> { threehandler (()=> { }) }) }) }
案例:回调嵌套过深的避免
渲染页面需要异步请求三类资源:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 let count = 0 let results = {}function done (key, value ) { count++ results[key] = value if (count >= 3 ) { render (results) } } fs.readFile (template_path, "utf8" , (err, template ) => { done ('template' , template) }) db.query (sql, (err, data ) => { done ('data' , data) }) l10n.get ((err, resources ) => { done ('resources' , resources) })
哨兵变量 :借助一个第三方函数/变量来处理异步协作的结果
偏函数+哨兵变量
1 2 3 4 5 6 7 8 9 10 11 12 function after (time, callback ) { let result = {} let count = 0 return function (key, value ) { result[key] = value count++ if (count >= time) { callback (result) } } } const done = after (3 , render)
事件与侦听器的关系
一对多 通过订阅模式实现侦听器发散
1 2 emitter.on ('event' ,listener1) emitter.on ('event' ,listener2)
多对一 通过偏函数实现事件收敛
1 2 3 const done = after (times, callback)emitter.on ('event1' , done) emitter.on ('event2' , done)
EventProxy的原理 EventProxy.all
EventProxy.all(e1,e2,…,listener)listener参数顺序 与订阅组合事件列表的参数 是一一对应 的 当每个事件都被触发之后,侦听器才会执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 const EventProxy = require ('./node_modules/eventproxy/index' )const proxy = new EventProxy ()proxy.all ( 'template' , 'data' , 'resources' , function (template, data, resources ) { console .log (...arguments ) } ) proxy.on ('template' , res => res) proxy.on ('data' , res => res) proxy.on ('resources' , res => res) proxy.emit ('template' ,'template' ) proxy.emit ('data' ,'data' ) proxy.emit ('resources' ,'resources' )
EventProxy.tail
EventProxy.tail 与all的区别在于: 组合事件中某个事件被再次触发 , 侦听器会使用最新的数据 继续执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 const EventProxy = require ('./node_modules/eventproxy/index' )const proxy = new EventProxy ()proxy.on ('eventA' , res => res) proxy.on ('eventB' , res => res) proxy.all ('eventA' , 'eventB' , (a, b ) => { console .log (`all: ${a} ${b} ` ) }) proxy.tail ('eventA' , 'eventB' , (a, b ) => { console .log (`tall: ${a} ${b} ` ) }) proxy.emit ('eventA' , 'AAA' ) proxy.emit ('eventB' , 'BBB' ) proxy.emit ('eventA' , 'AAAAAAA' )
EventProxy.after
EventProxy.after(event, time, listener) listener会在event事件触发time次后执行
1 2 3 4 5 6 7 8 9 const EventProxy = require ('./node_modules/eventproxy/index' )const proxy = new EventProxy ()proxy.after ('event' , 3 , (data ) => { console .log (data) }) proxy.on ('event' , res => res) proxy.emit ('event' , 'AAA' ) proxy.emit ('event' , 'BBB' ) proxy.emit ('event' , 'CCC' )
EventProxy基本原理
EventProxy来自于BackboneJs 的事件模块
EventProxy的异常处理 手动捕捉每一个异步请求的异常非常麻烦:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 const EventProxy = require ('./node_modules/eventproxy/index' )const ep = new EventProxy ()ep.all ('tpl' , 'data' , function (tpl, data ) { callback (null , { template : tpl, data : data }) }) ep.bind ('error' , function (err ) { ep.unbind () callback (err) }) fs.readFile ('template.tpl' , 'utf-8' , function (err, content ) { if (err) {return ep.emit ('error' ,err)} ep.emit ('tpl' , content) }) db.get ('some sql' , function (err, result ) { if (err) {return ep.emit ('error' , err)} ep.emit ('data' ,result) })
fail && done
EventProxy.fail 绑定错误处理函数 EventProxy.done 进行异常处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 function callback (err, res ) { if (err) { console .log (err) return } console .log (res) } const EventProxy = require ('./node_modules/eventproxy/index' )const ep = new EventProxy ()ep.all ('eventA' , 'eventB' , function (returnA, returnB ) { callback (null , { returnA, returnB }) }) ep.fail (callback) ep.on ('eventA' , res => res) ep.on ('eventB' , res => res) ep.done ('eventA' )(null , 'AAA' ) ep.done ('eventB' )('eventB-error' , 'BBB' )
fail
等同于
1 2 3 ep.fail (function (err ){ callback (err) })
等同于
1 2 3 4 ep.bind ('error' , function (err ){ ep.unbind () callback (err) })
done
done方法实际上是对事件触发的一种包装
ep.done(eventName)
等同于
1 2 3 4 5 6 function (err, content ){ if (err){ return ep.emit ('error' , err) } ep.emit ('event' , content) }
ep.done(contentFun)
done中也只传一个回调函数, 但需要在函数中手动emit
1 2 3 ep.done ((content )=> { ep.emit ('event' ,content) })
等同于
1 2 3 4 5 6 7 8 function (err,content ){ if (err){ return ep.emit ('error' ,err) } (function (content ){ ep.emit ('event' ,content) }(content)) }
ep.done(eventName, contentFun)
也可以即传入事件名称,又传入回调函数, 这里的回调函数主要是用于处理返回值的
1 2 3 4 ep.done ('event' ,function (content ){ return content })
等同于
1 2 3 4 5 6 7 8 9 function (err, content ){ if (err){ return ep.emit ('error' , err) } ep.emit ('event' ,(function (content ){ return content })(content)) }
Promise/Deferred模式 在事件订阅模型中,事件执行后的回调函数需要在注册时就明确指定
1 emitter.emit ('event' ,callback)
Promise/Deferred模式可以说是一种先执行异步调用延迟处理 的模式, 这种模式最早出现在Dojo 中:
异步Promise/Deferred模型
Promises/A 对异步操作下了三个抽象定义:
Promise操作仅存在3种状态: Promise状态只存在两种转化过程: Promise状态一旦转变,将不能被更改 then方法
then方法实际做的事就是:将回调函数存放起来
1 then (fulfilledHandler, errorHandler, progressHandler)
fullfilledHandler 完成态回调方法errorHandler 错误态回调方法progressHandler progress事件回调方法使用events完成then的简单实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 const EventEmitter = require ('events' ).EventEmitter const Promise = function ( ) { EventEmitter .call (this ) } util.inherits (Promise , EventEmitter ) Promise .prototype .then = function (fullfilledHandler, errorHandler, progressHandler ) { if (typeof fullfilledHandler === 'function' ) { this .once ('success' , fullfilledHandler) } if (typeof errorHandler === 'function' ) { this .once ('error' , errorHandler) } if (typeof progressHandler === 'function' ) { this .once ('progress' , progressHandler) } return this }
事件都注册之后,如何触发这些事件?
Deferred 延迟对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 let Deferred = function ( ) { this .state = 'unfilfilled' this .promise = new Promise () } Deferred .prototype .resolve = function (obj ) { this .state = 'fulfilled' this .promise .emit ('success' , obj) } Deferred .prototype .reject = function (err ) { this .state = 'failed' this .promise .emit ('error' , err) } Deferred .prototype .progress = function (data ) { this .promise .emit ('progress' , data) }
响应对象的封装
典型响应对象的封装:
1 2 3 4 5 6 7 8 9 10 res.setEncoding ('utf8' ) res.on ('data' , function (chunk ) { console .log ('BODY: ' + chunk) }) res.on ('end' , function ( ) { }) res.on ('error' , function (err ) { })
使用then 进行封装:
1 2 3 4 5 6 7 res.then (function ( ) { }, function (err ) { }, function (chunk ) { console .log ('BODY: ' + chunk) })
引入Deferred 对象的封装
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 const promisify = function (res ) { let deferred = new Deferred () let result = '' res.on ('data' , function (chunk ) { result += chunk deferred.progress (chunk) }) res.on ('end' , function ( ) { deferred.resolve (result) }) res.on ('error' , function (err ) { deferred.reject (err) }) return deferred.promise } promisify (res).then (function ( ) { }, function (err ) { }, function (chunk ) { console .log ('BODY: ' + chunk) })
Promise && Deferred
Deferred Promise 用于外部 通过then方法 给外部提供添加自定义逻辑入口 控制业务的可变部分 Deferred与业务的简化
并不是使用Deferred都可以简化业务, 因为对于不同的业务场景,都需要封装和改造Deferred, 对于不常使用的场景,考虑到封装和改造Deferred花费的时间,并不一定真正简洁划算。
Promise与event
Promise 高级接口 一旦定义,不太容易变化,缺少灵活性 方便解决典型问题 events Q model
1 2 3 4 5 6 7 8 9 10 11 12 13 14 const Q = require ('q' )const fs = require ('fs' ) const readFile = function (file, encoding ) { let deferred = Q.defer () fs.readFile (file, encoding, deferred.makeNodeResolver ()) return deferred.promise } readFile ('hello.txt' , 'utf-8' ).then (function (data ) { console .log (data) }, function (err ) { console .log (err) })
异步编程中异常的判断和处理 非常重要, Promise能够实现:
正向用例(success)和反向用例(error)的分离 逻辑处理延迟(then)
Promise的多异步协作
all的简单实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 Deferred .prototype .all = function (promises ) { let count = promises.length let that = this let results = [] promises.forEach (function (promise, i ) { promise.then (function (data ) { count-- results[i] = data if (count === 0 ) { that.resolve (resolves) } }, function (err ) { that.reject (err) }) }) return this .promise }
all的使用
1 2 3 4 5 6 7 let promise1 = readFile ('hello.txt' , 'utf-8' )let promise2 = readFile ('world.txt' , 'utf-8' )let deferred = new Deferred ()deferred.all ([promise1, promise2]).then ( res => { }, err => { } )
Promise进阶知识
Promise需要根据具体使用场景进行不同的封装, 与原生事件相比灵活性低一些, 但是更加适用于一些经典场景 。
前端自动化测试 在编写自动化测试时,前端程序员使用JavaScript 可以减轻切换环境的麻烦, 但是使用Node与远程服务器进行指令发送时, 使用到的网络库 是完全异步 的(而其它语言都是同步调用的 ), 因此在多个异步操作之间存在逻辑关系时,会采用Deferred模式
Pyramid of Doom
作为介绍Promise队列操作的铺垫, 有必要提一下大名鼎鼎的Pyramid of Doom ,即恶魔金字塔 。
为了完成一系列彼此间存在逻辑关系的异步操作, 有如下三种地狱模式可供选择:
1 2 3 4 5 6 7 8 9 obj.api1 (function (value1 ) { obj.api2 (value1, function (value2 ) { obj.api3 (value2, function (value3 ) { obj.api4 (value3, function (value4 ) { callback (value4) }) }) }) })
1 2 3 4 5 6 7 8 9 10 11 12 13 let handler1 = function (value1 ) { obj.api2 (value1, handler2) } let handler2 = function (value2 ) { obj.api3 (value2, handler3) } let handler3 = function (value3 ) { obj.api4 (value3, handler4) } let handler4 = function (value4 ) { callback (value4) } obj.api1 (handler1)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 const events = require ('events' )const emitter = new events.EventEmitter ()emitter.on ('step1' , function ( ) { obj.api1 (function (value1 ) { emitter.emit ('step2' , value1) }) }) emitter.on ('step2' , function (value1 ) { obj.api2 (value1, function (value2 ) { emitter.emit ('step3' , value2) }) }) emitter.on ('step3' , function (value2 ) { obj.api3 (value2, function (value3 ) { emitter.emit ('step4' , value3) }) }) emitter.on ('step4' , function (value3 ) { obj.api4 (value3, function (value4 ) { callback (value4) }) }) emitter.emit ('step1' )
Promise链式调用原理
1 2 3 4 5 6 7 8 9 10 11 promise () .then (obj.api1 ) .then (obj.api2 ) .then (obj.api3 ) .then (obj.api4 ) .then (function (value4 ) { }, function (err ) { }) .done ()
改造Promise以实现链式调用 (两步):
准备好回调队列 逐个执行回调,当回调返回新的Promise对象时:新Promise对象继承回调队列 Deferred对象更新为新Promise对象1 2 3 4 5 6 7 let Promise = function ( ){ this .queue = [] this .isPromise = true } let Deferred = function ( ){ this .promise = new Promise () }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Deferred .prototype .resolve = function (obj ){ let promise = this .promise let handler while ((handler = promise.queue .shift ())){ if (handler && (handler.fulfilled )){ let ret = handler.fulfilled (obj) if (ret && ret.isPromise ){ ret.queue = promise.queue this .promise = ret return } } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Deferred .prototype .reject = function (err ) { let promise = this .promise let handler while ((handler = promise.queue .shift ())) { if (handler && handler.error ) { let ret = handler.error (err) if (ret && ret.isPromise ) { ret.queue = queue.queue this .promise = ret return } } } }
1 2 3 4 5 6 7 8 9 Deferred .prototype .callback = function ( ){ const that = this return function (err, file ){ if (err){ return that.reject (err) } return that.resolve (file) } }
1 2 3 4 5 6 7 8 9 10 11 Promise .prototype .then = function (fulfilledHandler, errorHandler ){ let hander = {} if (typeof fulfilledHandler === 'function' ){ handler.fulfilled = fulfilledHandler } if (typeof errorHandler === 'function' ){ hander.error = errorHandler } this .queue .push (handler) return this }
使用链式调用改造回调地狱案例:
1 2 3 4 5 6 7 8 9 10 obj.api1 (value1) .then ((value2 )=> { return obj.api2 (value2) }) .then ((value3 )=> { return obj.api3 (value3) }) .then ((value4 )=> { callback (value4) })
批量将方法Promise异步化
这样如果希望能实现依次读取文件的功能, 需要对读取文件函数进行异步包装:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 const fs = require ('fs' )const Q = require ('q' )function readFile (file, encoding ) { let defer = Q.defer () fs.readFile (file, encoding, defer.makeNodeResolver ()) return defer.promise } readFile ('file1.txt' , 'utf-8' ) .then (res => { console .log (res) return readFile (res,'utf-8' ) }) .then (res => { console .log (res) })
也可以构建能够对任何方法进行异步包装的偏函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 const fs = require ('fs' )const Q = require ('q' )function smooth (method ) { return function ( ) { let deferred = Q.defer () let args = Array .prototype .slice .call (arguments , 0 ) args.push (deferred.makeNodeResolver ()) method.apply (null , args) return deferred.promise } } const readFile = smooth (fs.readFile )readFile ('file1.txt' , 'utf-8' ) .then (res => { console .log (res) return readFile (res,'utf-8' ) }) .then (res => { console .log (res) })
流程控制库 除了事件/订阅 和Promise/Defer 模式以外,还有一些尝试用的流程控制库:
尾触发与Next
尾触发: 指需要手工调用 才能持续执行后续调用的方法next: 尾触发的常见关键词Connect中间件: 尾触发目前应用最多的地方
1 2 3 4 5 6 7 8 9 const connect = require ('connect' )const fs = require ('fs' )let app = connect ()app.use (function middleware1 (req, res, next ) { next () }) app.use (function middleware2 (req, res, next ) { next () })
MiddleWare 中间件机制
中间件机制在处理网络请求时, 可以像面向切面编程 一样进行过滤、验证、日志 等功能, 不与业务逻辑产生耦合关联
面向切面编程 AOP AOP Aspect Oriented Program 简单来说,就是在运行中动态将代码切入到类的指定方法、指定位置, 是对OOP编程的一种补充。
connect核心实现
app.stack 存放服务器内部维护的中间件队列
1 2 3 4 5 6 7 8 9 10 11 function createServer ( ) { function app (req, res ) { app.handle (req, res) } utils.merge (app, proto) utils.merge (app, EventEmitter .prototype ) app.route = '/' app.stack = [] for (let i = 0 ; i < arguments .length ; ++i){ app.use (arguments [i]) } return app }
app.use 用于将中间件放入stack队列
1 2 3 4 5 6 7 app.use = function (route, fn ){ this .stack .push ({ route : route, handle : fn }) return this }
app.listen 监听函数借助http 模块实现
1 2 3 4 5 const http = require ('http' )app.listen = function ( ){ const server = http.createServer (this ) return server.listen .apply (server, arguments ) }
app.handle ,每一个监听到的网络请求都从此处开始处理
1 2 3 app.handle = function (req, res, out ){ next () }
将队列中的中间件取出并执行, 随后递归调用、持续触发
1 2 3 4 5 6 function next (err ) { layer = stack[index++] layer.handle (req, res, next) }
Connect的流式处理
Connect中的尾触发一般用于拆分对网络请求 的处理, 能够将串行逻辑扁平化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 const connect = require ('connect' )const app = connect ()app.use ('/' , function (req, res, next ) { next () }) app.use ('/' , function (req, res, next ) { next () }) app.use ('/' , function (req, res, next ) { res.end () }) http.createServer (app).listen (3000 )
async
流程控制模块async提供用于处理异步的各种协作模式 :
async.series 无依赖的异步串行执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 const async = require ('async' )const fs = require ('fs' )async .series ([ function (callback ) { fs.readFile ('file1.txt' ,'utf-8' ,(err,data )=> callback (null ,data)) }, function (callback ) { fs.readFile ('file2.txt' ,'utf-8' ,(err,data )=> callback (null ,data)) } ], function (err, results ) { if (err) { throw err } console .log (results) })
async.series第一个参数事件数组 中元素有如下结构:
1 2 3 4 (callback)=>{ callback (result) }
这里的callback 实际上与connect.next 类似, 用于结束本次执行,开始下次执行, 并且其传参是用于将本次执行的结果保存起来的
手动实现异步并行执行 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 let counter = 2 let results = []let hasErr = false function done (data ) { counter -= 1 results.push (data) if (counter == 0 ) { callback (null , results) } } function fail (err ) { if (!hasErr) { hasErr = true callback (err) } } function callback (err, result ) { if (err) { throw err } console .log (result) } fs.readFile ('file1.txt' , 'utf-8' , (err, data ) => { if (err) { fail (err) return } done (data) }) fs.readFile ('file2.txt' , 'utf-8' , (err, data ) => { if (err) { fail (err) return } done (data) })
EventProxy实现异步并行执行 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 const EventProxy = require ('eventproxy' )const fs = require ('fs' )const proxy = new EventProxy ()function callback (err, res ) { if (err) throw err console .log ('result' ,res) } proxy.all ('content' , 'data' , function (content, data ) { callback (null , [content, data]) }) proxy.fail (callback) fs.readFile ('file1.txt' , 'utf-8' , proxy.done ('content' )) fs.readFile ('file2.txt' , 'utf-8' , proxy.done ('data' ))
parallel 异步并行执行
1 2 3 4 5 6 7 8 9 10 11 12 async .parallel ([ function (callback ) { fs.readFile ('file1.txt' ,'utf-8' ,callback) }, function (callback ) { fs.readFile ('file2.txt' ,'utf-8' ,callback) }], function (err, res ) { if (err) { throw err } console .log (res) } )
waterfall 结果存在依赖关系的异步串行执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 async .waterfall ([ function (callback ) { fs.readFile ('file1.txt' , 'utf-8' , function (err, content ) { callback (err,content) }) }, function (arg1, callback ) { fs.readFile (arg1, 'utf-8' , function (err, content ) { callback (err,content) }) } ], function (err, res ) { if (err) throw err console .log (res) })
auto 自动依赖处理
对于存在复杂依赖关系的业务环境,auto能够实现:根据依赖关系自动分析,以最佳的顺序执行业务 (强!!!)
1 2 3 4 5 6 7 8 9 10 let deps = { fun1 :callback =>callback (), fun2 :['fun3' ,'fun4' ,callback =>callback ()], fun3 :['fun1' ,callback =>callback ()], fun4 :['fun4' ,callback =>callback ()] } async .auto (deps)
比如有以下一系列业务需要实现:
从磁盘读取配置文件 readConfig 根据配置文件连接MongoDB connectMongoDB 根据配置文件连接Redis connectRedis 编译静态文件 complieAsserts 上传静态文件到CDN uploadAsserts 启动服务器 startup EventProxy实现此场景 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 const EventProxy = require ('eventproxy' )const proxy = new EventProxy ()proxy.asap ('readtheconfig' , function ( ) { proxy.emit ('readConfig' ) }).on ('readConfig' , function ( ) { proxy.emit ('connectMongoDB' ) }).on ('readConfig' , function ( ) { proxy.emit ('connectRedis' ) }).assp ('complieasserts' , function ( ) { proxy.emit ('complieAsserts' ) }).on ('complieAsserts' , function ( ) { proxy.emit ('uploadAsserts' ) }).all ('connecctMongoDB' , 'connectRedis' , 'uploadAsserts' , function ( ) { })
生成依赖树如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 async .auto ({ 'readConfig' : function (callback ) { callback () }, 'connectMongoDB' : ['readConfig' , function (callback ) { callback () }], 'connectRedis' : ['readConfig' , function (callback ) { callback () }], 'complieAsserts' : function (callback ) { callback () }, 'uploadAsserts' : ['complieAsserts' , function (callback ) { callback () }], 'startup' : [ 'connectMongoDB' , 'connectMongoDB' , 'complieAsserts' , 'uploadAsserts' , function (callback ) { callback () } ] })
Step
更加轻量级 的流程控制库:Tim Caswell 的 Step
1 2 step (task1,task2,task3)
step的this 关键字实际上发挥的是next 方法的作用
1 2 3 4 5 6 7 8 9 10 11 12 13 const step = require ('step' )const fs = require ('fs' )step ( function readFile1 ( ) { fs.readFile ('file1.txt' ,'utf-8' ,(err,data )=> this (data)) }, function readFile2 (param ) { fs.readFile (param,'utf-8' ,this ) }, function done (err, content ) { console .log (content) } )
并行任务执行 parallel
parallel: 告诉Step需要等所有任务完成时 才能进行下一个任务
注意:step只能取到异步方法传回来的前2个参数 , 此方法的原理在于计数器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 let asyncCall = function (callback ) { process.nextTick (function ( ) { callback (null ,'result1' ,'result2' ) }) } step ( function readFile1 ( ) { fs.readFile ('file1.txt' , 'utf-8' , this .parallel ()) fs.readFile ('file2.txt' , 'utf-8' , this .parallel ()) asyncCall (this .parallel ()) }, function done ( ) { console .log (arguments ) } )
结果分组 group
group 从流程控制上说和parallel 很相似,但是会将结果打包成数组传回
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 step ( function readDir ( ) { fs.readdir (__dirname, this ) }, function readFiles (err, results ) { if (err) throw err let group = this .group () results.forEach (filename => { if (/\.txt$/ .test (filename)) { fs.readFile (__dirname+'/' +filename,'utf-8' ,group ()) } }) }, function showAll (err, files ) { if (err) throw err console .dir (files) } )
为什么要调用两次group?
第一次调用:告知step要并行执行
1 let group = this .group ()
第二次调用:用于生成回调函数接受返回值按组存储
1 fs.readFile (__dirname+'/' +filename,'utf-8' ,group ())
wind
Monadic 单子 monadic ,是一种用于处理具有副作用的计算, 如I/O操作、异常处理和状态管理等。
wind算是对JavaScript实现的一种Monadic扩展, 为了说明wind 的具体使用场景,我们先提出一个比较特殊的需求:
实现冒泡排序的动画
冒泡排序代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 function swap (a, i, j ) { let t = a[i] a[i] = a[j] a[j] = t } function compare (a, b ) { return (a-b) > 0 } function bubbleSort (array ) { for (let i = 0 ; i < array.length ; i++){ for (let j = 0 ; j < array.length -i-1 ; j++){ if (compare (array[j], array[j + 1 ])) { swap (array,j,j+1 ) } } } return array }
如果需要添加动画效果的话,只要在swap 函数中添加就可以, 但是动画效果存在延时 , 在延时时段中动画还没有执行完,数组中的数据就又开始变化了, 因此该如何让动画执行完之后再进行下一次的数据交换呢?
使用yield实现sleep 也可以使用ES6中的yield实现流程控制, 在每次冒泡交换之后添加yield 阻塞流程, 转而花费n秒控制动画延时, 动画执行结束,再手动调用生成器函数的next 方法。
核心代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 function * bubbleSort (array ){ for (let i=0 ;i<array.length ;i++){ for (let j=0 ;j<array.length -i-1 ;j++){ if (compare (array[j],array[j+1 ])){ swap (array,j,j+1 ) yield 'stop' } } } } function next (gen ){ let n = gen.next () if (n.done ){ return } paint ().then (()=> next (gen)) } let gen = bubbleSort (arr)next (gen)
使用wind实现此功能代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 const Wind = require ('wind' )let swapAsync = eval (Wind .compile ("async" , function swap (a, i, j ) { $await(Wind .Async .sleep (500 )) var t = a[i] a[i] = a[j] a[j] = t console .log ('repaint' ) })) function compare (a, b ) { return (a - b) > 0 } let bubbleSort = eval (Wind .compile ("async" , function (array ) { for (var i = 0 ;i<array.length ;i++){ for (var j = 0 ; j < array.length - i -1 ;j++){ if (compare (array[j],array[j+1 ])){ $await(swapAsync (array,j,j+1 )) } } } })) bubbleSort ([7 ,-3 ,10 ,-1 ,0 ,99 ]).start ()
wind的特异之处
Wind.compile()对普通函数进行编译 , 再交给 eval() 执行
定义异步任务
1 eval (Wind .compile ("async" ,function ( ){}))
内置对setTimeout()的封装
$await 实际是一个等待的 占位符 , 表示等待完成异步方法
任务模型
每一个异步操作 都可以转换为一个任务 , 这也就是wind基于的任务模型 。
正式的任务创建方法:Task.create()/complete()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 const Wind = require ('wind' )const Task = Wind .Async .Task const fs = require ('fs' )let readFileAsync = function (file, encoding ) { return Task .create (function (t ) { fs.readFile (file, encoding, function (err, file ) { if (err) { t.complete ('failure' , err) } else { t.complete ('success' , file) } }) }) } let serial = eval (Wind .compile ("async" , function ( ) { var file1 = $await(readFileAsync ('file1.txt' , 'utf-8' )) console .log (file1) var file2 = $await(readFileAsync ("file2.txt" , 'utf-8' )) console .log (file2) try { var file3 = $await(readFileAsync ("file4.txt" ,'utf-8' )) } catch (err) { console .log (err) } })) serial ().start ()
异步并行
直接使用 $await(asyncTask()) 的方式能实现异步任务串行执行, 实现了不阻塞CPU阻塞代码的目的
Wind实现异步任务并行执行:Task.whenAll
1 2 3 4 5 6 7 8 9 let parallel = eval (Wind .compile ('async' , function ( ) { var results = $await(Task .whenAll ({ file1 : readFileAsync ('file1.txt' , 'utf-8' ), file2 : readFileAsync ('file2.txt' ,'utf-8' ) })) console .log (results) })) parallel ().start ()
1 2 3 4 5 let series = eval (Wind .compile ('async' ,function ( ){ $await(asyncTask ()) }))
可以看到引入wind之后,异步函数实际上已经和同步调用十分近似了 因此很适合用于从已有的同步代码向Node端迁移 wind提供了两个辅助方法:
Wind.Async.Binding.fromCallback Wind.Async.Binding.fromStandard fromCallback 转换无异常的调用
无异常的调用通常只有一个参数返回:
1 2 3 4 5 fs.exists ("/etc/passwd" ,function (exists ){ }) let existsAsync = Wind .Async .Binding .fromStandard (fs.exists )
fromStandard 转换带异常的调用
带异常的调用的第一个参数作为异常标识
1 2 3 4 fs.readFile ('file1.txt' ,function (err,data ){ }) let readFileAsync = Wind .Async .Binding .fromStandard (fs.readFile )
小结 方案对比
事件发布/订阅模式 Promise/Deferred模式 流程控制库
异步并发控制 所谓异步并发性能的关键可以总结为这样一个问题:
怎样发起异步并发,能够实现既充分压榨底层系统的性能,同时给予其一定过载保护,防止过犹不及?
泪目,资本家竟是我自己
如果不对并发量作限制的话,连续大量的并发请求会迅速用光下层服务器的资源, 针对并发量的限制问题,这里给出了两个解决方案:
bagpipe
async.parallelLimit / async.queue
bagpipe解决方案 bagpipe解决思路
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 let allTaskList = [] let activeTaskList = [] let limitNumber = 100 function parallelControl ( ){ if (allTaskList.length && activeTaskList.length <limitNumber){ let task = allTaskList.shift () activeTaskList.push (task) task.then ((res )=> { activeTaskList.del (task) parallelControl () }) }else { parallelControl () } }
实际使用到两个bagpipe的API:
1 2 3 4 5 6 7 8 9 const Bagpipe = require ('bagpipe' )const fs = require ('fs' )let bagpipe = new Bagpipe (3 )bagpipe.on ('full' ,function (length ){ console .log (`query block:[${length} ]` ) }) for (let i = 0 ; i<5 ; i++){ bagpipe.push (fs.readFile ,'file1.txt' ,'utf-8' ,function (err,data ){}) }
bagpipe的配置
disabled refuse 拒绝模式 针对对实时性 要求高的场景 限流分批发起请求可能导致一部分请求必须等待一段时间,无法满足丢时间要求较高的数据请求 并行队列一旦阻塞就返回失败报错 timeout
async的解决方案 async.parallelLimit
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 const async = require ('async' )const fs = require ('fs' )async .parallelLimit ([ function (callback ) { fs.readFile ('file1.txt' ,'utf-8' ,callback) }, function (callback ) { fs.readFile ('file2.txt' ,'utf-8' ,callback) } ], 1 , function (err, results ) { if (err) { throw err } console .log (results) })
1 2 3 4 5 async .parallelLimit ( [], limit, callback )
async.queue
async.parallelLimit 无法动态增加并行任务, 但是async.queue 支持此需求:
1 2 3 4 5 6 7 8 9 10 let q = async .queue (function (file, callback ) { fs.readFile (file,'utf-8' ,callback) }, 2 ) q.drain = function ( ) { } fs.readdirSync ('.' ).forEach (function (file ) { q.push (file, function (err, data ) { }) })