Typescript第八章 异步编程并发并行

异步API,比如说回调promise和流。

JavaScript引擎在一个线路中多路复用任务,而其他任务则处于空闲状态。这种事件循环是JavaScript引擎标准线程模型

多路复用是指在一个线程中同时处理多个任务

异步编程让程序难以理解我们不能一行一行的分析程序

Typescript提供了工具通过类型可以追踪异步操作借助内置的async/await可以把熟悉的同步思想运用到异步程序上。使用Typescript可以多线程程序指定严格的的消息传递协议

我们先讨论一下JavaScript引擎中具体是如何运作的,弄清楚为什么看似单个的线程可以暂停和恢复执行

8.1 JavaScript的事件循环

下面代码怎么执行?

 setTimeout(()=>console.log("A"),1)// 时间到了就加入事件队列等待执行
 setTimeout(()=>console.log("B"),2)
 console.log("C");

JavaScript和C语言sleep才用的并发模型,以及java作业调度到另外一个线程不同

概括的说,JavaScriptVM采用下述方式模拟并发

8.2 处理回调

JavaScript异步程序的核心基础就是回调。回调其实就是常规的函数,只是作为参数传给另一个函数。就像在同步程序中一样,另一个函数在做完操作(处理网络请求等)之后调用回调函数。异步代码调用的回调也是函数,而且类型签名没有标明函数是异步调用的。

NodeJS的原生API。例如fs.readFile(采用异步方式从磁盘读取文件内容)和dns.resolveCname采用异步方式解析CNAME记录),按照约定,回调的第一个参数错误null第二个参数结果null。(错误优先)。

readFile的类型签名如下

 function readFile(
     path:string,
     options:{encoding:string,flag?:string},
     callback:(err:Error|null,data:string|null)=>void
 ):void{}

注意readFile和callback类型没有什么特别之处,都是常规的JavaScript函数。签名没有特别标明readFile是异步的,也没有指出在调用readFile之后控制权会立即传给下一行代码(不等待readFile的结果)。

pnpm add @types/node -D安装nodejs类型声明

下面写一个案例读取文件

 import * as fs from 'fs'
 fs.readFile(
     "log.txt",
     {encoding:'utf-8'},
     (error,data)=>{
         if(error){
             console.log(error);
             return
         }
         console.log("success!:"+data);
     }
 )
 // 采用并发写入
 fs.appendFile(
     "log.txt",
     "笑死我了",
     error=>{
         if(error){
             console.log("error"+error);
         }
     }
 )

这里API是异步的,不能把API在代码中调用顺序理解为执行文件系统操作顺序readFile虽然在前面调用但是读取出来的访问日志可能没有后面新增的那行日志,具体有没有要看运行这段代码时文件系统有多繁忙。

Nodejs一般约定,如果函数的最后一个参数是一个接受两个参数的函数,而且顺序为Error|null和T|null类型,那么这个函数通常是异步的。

无论如何,从类型上是看不出来的。

回调函数容易产生一个问题,回调地狱

 setTimeout(()=>{
     console.log(1);
     setTimeout(()=>{
         console.log(2);
         setTimeout(()=>{
             console.log(3);
             console.log("啊");
         },1)
     },1)
 },1)

顺序执行的操作往往是一环扣一环,前一步成功才执行下一步,除非遇到错误,我们要自己手动维护执行的顺序。按一定顺序执行的回调很容易出错

有序操作只是我们想借助异步任务执行的一种操作,此外我们可能还想并行运行几个函数,获知全部函数何时运行完毕,或者让几个函数竞争,只获取第一个结束的函数返回结果等。

  • 使用回调函数可执行简单的异步任务
  • 虽然回调适合处理简单的任务,但是如果异步任务变多,很容易一团乱麻。

8.3 promise(我tm来了!!!)

我们不是第一批遇到这些限制程序员本节说明promise这个概念对异步进抽象,方便任务编排排列任务等。即使你以前使用promise或future,也能更好理解他们原理

 // 自行分析
 console.log("start");
 async function async1() {
     console.log("async1");
     let result = await async2()
     console.log(result);
 }
 async function  async2() {
     console.log("async2");
     return Promise.resolve("wohaole")
 }
 async1()
 console.log("end");

下面举个例子,指出我们想如何使用Promise:先向文件添加一些内容然后再把文件中的内容读取出来:

 function appendAndReadPromise(
     path:string,
     data:string
 ):Promise<string>{
     return appendPromise(
         path,data
     ).then(()=>readPromise(path))
     .catch(error=>console.log(error);)
 }

注意这里没有回调地狱,我们把想执行的一些异步任务,变成了易于理解线性链条:前一个任务完成后才能执行下一个任务,倘若失败,则跳到catch子句假如基于回调的API,那么写出的代码可能是下面这样:

 function appendAndRead(
     path: string,
     data: string,
     cb: (error: Error | null, result: string | null | any) => void
 ) {
     appendFile(path, data, err => {
         if (err) {
             return cb(err, null)
         }
         readFile(path, (err, result) => {
             if (err) {
                 return cb(err, null)
             }
             cb(null, result)
         })
     })
 }

针对这个设想,我们手动实现PromiseAPI

new Promise接受一个函数,我们称为执行器executor)。在Promise的实现中,执行器接受两个参数,一个是resolve函数,一个是reject函数。

 type Executor = (
     resolve: Function,
     reject: Function
 ) => voidclass Promise {
     constructor(f:Executor){}
 }

那么,resolve和reject如何运行的呢?下面通过示例说明一下。假设我们把Nodejs中的一个回调API(例如:fs.readFile)改造成基于Promise的API,Nodejs内置的fs.readFileAPI时这样使用的:

 import {readFile} from 'fs'
 readFile(path,(error,result)=>{
     //..
 })
 import {readFile} from 'fs'
 function readFilePromise(path:string):Promise<string>{
     return new Promise((resolve,reject)=>{
         readFile(path,(err,result)=>{
             if(err){
                 reject(err)
             }
             resolve(result)
         })
     })
 }

可见,resolve参数什么类型取决于具体使用的API(这里,其参数的类型就是result的类型),而reject的参数始终是Error类型。因此,我们要更行类型,把不安全的Function改为更具体的类型:

 type Executor<T,E extends Error> = (
     resolve: (result:T)=>void,
     reject: (error:E)=>void
 ) => voidclass Promise<T,E extends Error> {
     constructor(f:Executor<T,E>){}
 }
 class Promise<T,E extends Error> {
     constructor(f:Executor<T,E>){}
     then<U,F extends Error>(g:(result:T)=>Promise<U,F>):Promise<U,F>{//}
     catch<U,F extends Error>(g:(error:E)=>Promise<U,F>):Promise<U,F>{//}
 }

then和catch不同的方式排列Promise:then把成功从一个Promise获得的结果映射到一个新Promise上,catch则把错误映射到一个新的Promise上,从被拒绝状态中走出去。(这种风格和上节中的Option设计模式一样都受函数式编程语言Haskell中的Monad设计模式影响

then的使用方法

 let a:()=>Promise<string,TypeError> = //..
 let b:(s:string)=>Promise<number,never> = //
 let c:()=>Promise<boolean,RangeError>=//
 a().then(b).catch(e=>c())
 .then(result=>console.info("Done",result))
 .catch(e=>console.log("error",e))

此外,我们还有处理Promise真正抛出异常的情况(例如,throw Error(‘foo’)),为此,在实现then和catch要把代码放在try/catch中,在cattch分支处理被拒绝的情况,然后事情并没有那么简单,这里还涉及一些其他问题

  1. Promise对象都有可能被拒,而通过静态检查发现不了这个问题
  2. Promise对象被拒不一定是因为有错。Typescript别无选择,只能继承JavaScript的行为,而在JavaScript中,通过throw可以抛出一切。

考虑两点,我们要放宽对Promise类型的要求,不指定错误类型

 type Executor<T> = (
     resolve: (result: T) => void,
     reject: (error: unknown) => void
 ) => voidclass Promise<T> {
     constructor(f: Executor<T>) { }
     then<U>(g: (result: T) => Promise<U>): Promise<U> {
         // 
     }
     catch<U>(g: (error: unknown) => Promise<U>): Promise<U>{
         // 
     }
 }
 let a = new Promise((resolve,reject)=>{
     resolve(123)
 })
 a.then((result)=>{
     // 
 })

至此,Promise封装好了,内部具体实现,自行研究

8.4 async和await

promise对于异步代码所做的抽象十分强大,JavaScript自身(当然包括Typescript)也有相应的句法:async和awai使用这种句法,可以像同步操作那样处理异步操作

await可以视为.then在语言上的语法糖。使用await处理Promise对象时,要把相关的代码放在async块中。这种情况下不再使用.catch,而是把await放在常规的try/catch中。

 // 老写法
 function getUser(){
     getUserId(18).then(user=>getLocation(user))
     .then(location=>console.log(location))
     .catch(error=>console.log(error))
     .finally(()=>console.log("done"))
 }
 async function getUser() {
     try {
         let user = await getUserId(18)
         let location = await getLocation(user)
         console.log(location);
     } catch (error) {
         console.log("error");
     } finally{
         console.log("done");
     }
 }
 ​

async和await是JavaScript特性,这里就不深入探究了。

8.5 异步流

promise对象是便于建模,排列和编排未来的值,但是如果有多个值在未来的不同时刻产出。这种情况并不少见,比如从文件系统读取文。

这样的操作有不同的建模方式,最为常见的是事件发射器(Nodejs EventEmitter)或响应编程库(RxJS)。这两种方式之间区别就像回调和promise对象一样:事件简单轻量,而响应式编程库更强大,可以编排和排列事件流。

事件发射器

事件发射器提供的API用于通道中发射事件,并监控通道中的事件:

 interface Emitter{
     emit(channel:string,value:unknown):void
     on(channel:string,f:(value:string)=>void):void
 }
 ​

发射器是JavaScript中一种常见的设计模式,使用DOM事件,或Nodejs EventEmitter模块可能涉及。

 type RedisClient = {
     on<E extends keyof Events>(
         event:E,
         f:(arg:Events[E])=>void
     ):void
     emit<E extends keyof Events>(
         event:E,
         arg:Events[E]
     ):void
 }
 type RedisClient = {
     on(event:'ready',f:()=>void):void
     on(event:'error',f:(e:Error)=>void):void
     on(event:'reconnecting',f:(params:{attempt:number,delay:number})=>void):void
     
 }
     // 优化
 type Events = {
     ready:void
     error:Error
     reconnecting:{attempt:number,delay:number}
 }
 type RedisClient = {
     on<E extends keyof Events>(
         event:E,
         f:(arg:Events[E])=>void
     ):void
 }

把事件名称和参数提取结构中,然后映射结构生成监听器发射器,这是Typescript中常见的模式

8.6 多线程类型安全

目前我们讨论的异步程序基本运行在一个CPU线程中,不过,一些CPU密集型任务可能需要并行,把一项任务分到多个线程中。这么做可能是为了提升速度,可能是想让主线程空闲出来,继续相应后序操作。本节介绍编写安全的并行程序。涵盖浏览器服务器

8.6.1 在浏览器中:使用Web职程

浏览器大都支持web职程(worker)处理多线程。未免某些操作(例如CPU密集型任务)阻塞主线程,导致UI无响应,我们可以在JavaScript主线程中创建一些职称(严格受限的后后台代码,而Promise和setTimeout等异步PAI只是并发运行代码。职程在另一个cpu运行代码)。web职程可以处理网络请求,文件系统写入等操作,不过有一些小限制。

Web职程是浏览器提供的API,设计人员安全性提出了更高的要求,这里的“安全性”指的是,内存安全。如果多个线程读取同一块内存,很容易遇到各种并发问题。例如不确定性死锁

tsconfig.json加入lib”:[“dom”,”es20120”,”webworker”]支持webworker

 // 主线程代码
 let worker = new Worker("./workerScript.js")
 worker.postMessage("some data")
 ​
 // 接受其他线程
 worker.onmessage = e=>{
     console.log(e.data);
 }
 // 并行线程,只能在浏览环境才能用
 onmessage = e=>{
     console.log("child",e.data);
    
 // 发给主线程 
     postMessage("receiver"+e.data)
 }
 ​

这样传递消息简单,但是没有类型,无法确保正确处理可能发送的所有消息类型。

类型安全协议

我们知道如何两个线程之间传递消息,那么,若想指明一个命令始终收到特定事件的响应怎么做呢?

我们可以选择在职程 中定义函数,把参数发给该函数,再把结果发送回来。

我们构建一个计算引擎,让他支持三种运算:求矩阵行列式,计算两个矩阵的点积和求逆矩阵。

 type Matrix = number[][]
 type MatrixProtocol = {
     determinant:{
         in:[Matrix],
         out:number
     },
     'dot-product':{
         in:[Matrix,Matrix]
         out:Matrix
     },
     invert:{
         in:[Matrix],
         out:Matrix
     }
 }

我们将在主线程中定义矩阵,运算则交给职程。我们对不安全的操作(职程发送和接受不带类型的消息)进行包装,把带类型的API开放给使用方。我们定义一个简单请求响应协议Protocol列出职程可执行的操作,并为预期的输入输出声明类型。

 type Protocol = { // 1
     [command:string]:{
         in:unknown[],
         out:unknown
     }
 }
 function createProtocol<P extends Protocol>(script:string){ //2
     return <K extends keyof P>(command:K)=>(...args:P[K]['in'])=>{
         return new Promise<P[K]['out']>((resolve,reject)=>{
             let worker = new Worker(script)
             worker.onerror = reject
             worker.onmessage = event => resolve(event.data.data)
             worker.postMessage({command,args})
         })
     }
 }
 // 
 let runWithMatrixProtocol = createProtocol<MatrixProtocol>(
     'MatricWorkerScript.js'
 )
 let parallelDeterminant = runWithMatrixProtocol('determinant')
 ​
 parallelDeterminant([[1,2],[3,4]])
 .then(determinant=>console.log(determinant))
 ​
 type Matrix = number[][]
 type MatrixProtocol = {
     determinant:{
         in:[Matrix],
         out:number
     },
     'dot-product':{
         in:[Matrix,Matrix]
         out:Matrix
     },
     invert:{
         in:[Matrix],
         out:Matrix
     }
 }
 ​
 type Protocol = { // 1
     [command:string]:{
         in:unknown[],
         out:unknown
     }
 }
 function createProtocol<P extends Protocol>(script:string){ //2
     return <K extends keyof P>(command:K)=>{
         return (...args:P[K]['in'])=>{
             return new Promise<P[K]['out']>((resolve,reject)=>{
                 let worker = new Worker(script)
                 worker.onerror = reject
                 worker.onmessage = event => resolve(event.data.data)
                 worker.postMessage({command,args})
             })
         }
     }
     
 }
 // 
 let runWithMatrixProtocol = createProtocol<MatrixProtocol>(
     'MatricWorkerScript.js'
 )
 let parallelDeterminant = runWithMatrixProtocol('determinant')
 ​
 parallelDeterminant([[1,2],[3,4]])
 .then(determinant=>console.log(determinant))

8.6.2 在nodejs使用子进程

 // main
 import {fork} from 'child_process'
 let child = fork("./child.js")
 ​
 child.on("message",data=>{
     
 })
 child.send({type:'syn',data:[3]})

原文地址:https://blog.csdn.net/qq_47658204/article/details/132058211

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_45978.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注