问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

Scala+Future实现异步编程

发布网友 发布时间:2024-09-26 02:56

我来回答

1个回答

热心网友 时间:2024-10-07 06:09

多核处理器以及并行任务的逐渐普及,人们对异步编程也越来越关注。Scala标准库中提供的Future允许你在得到真正的执行结果之前,就允许通过map,filter等集合操作得到下一个变换之后的异步结果。

我们无需再以阻塞的方式等待每一步结果,而是使用Future快速构造出一个异步的,对一系列不可变的结果的操作流水线出来。

早在笔者之前介绍的Akka,Play等实战Demo中,我们就已经接触过Future了。在本专题中,将详细地介绍如何正确地使用它。

坎坷的线程同步控制

Java为每一个对象关联了逻辑监视器(monitor),用来控制对数据的多线程访问。通过这种模型,我们来决定哪些数据可以被多线程共享,并使用synchronized关键字“加锁”。

要用锁模型来创建一个健壮的多线程应用实际上是一件非常困难的事情。对于每个需要被共享的数据,我们都要为它上锁,并且确保不会引发死锁问题。然而,即便我们主动对数据上锁,它们也不是在编译期间就固定的,在运行期间,程序仍然可以任意地创建新的锁。

后来,Java提供了java.util.concurrent包来提供更高级别的抽象同步,至少要比自己手动通过各种同步语法来实现不稳定的同步机制要来得快。然而,这类工具包仍然是基于共享数据和锁的,因此在本质上没有解决该类模型的种种困难。

在Scala程序中创建第一个Future

ScalaFuture在相当程度上减少了程序员对共享数据进行和锁处理的负担。如果某个函数的执行结果返回的是一个Future,则它意味着将返回另一个要被异步执行的计算,而至于哪个线程来处理这个稍后的异步计算,将由Scala提供的执行上下文(ExecutionContext)来决定。

因此,在使用Future实现异步编程之前,首先需要将执行上下文导入进来:

importscala.concurrent.ExecutionContext.Implicits.global

这几乎是一个必选项,否则,程序在编译时将会报错。我们通过Future伴生对象提供的apply方法创建第一份“未来计划”:

valfuture=Future{//在这个计划中,执行它的线程会首先休眠3秒,然后返回一个Int值。Thread.sleep(3000)200}

有两种方法确定这个异步计算是否已经得到结果:

调用future.isCompleted,如果异步计算还未执行完毕,则返回false。

调用future.value,如果计算完毕,则返回Some(Sussess(value)),否则返回None。

为什么value方法做了两层包裹呢?首先,需要考虑到这个异步计算是否执行完毕。因此最外层返回的是一个Option类型,如果有计算结果,则返回,否则None。

另外,计算结果也包含了两种情况。如果计算时没有出现错误,则计算结果可以装入Success类中返回。反之,调用value将返回一个Failure。

Try类型

Success和Failure属于Try类,代表着两个异步运算的两个可能结果。它的目的是提供一个在同步计算中类似于try...catch的效果,允许程序员自行处理返回Failure的情况。

在异步编程当中,try/catch语句将不再有效,因为Future计算经常都在别的线程当中执行,而导致原线程当中并不能捕捉到异常。此时,大写的Try类型就排上用场了:如果某个异步计算抛出了Failure,则说明这个计算过程当中出现了一些意外。

对Future进行流式处理

某个异步计算的Future可以通过map,filter等操作衔接到另一个异步计算中。例如:

//后续的代码端中将不再提醒导入执行上下文。importscala.concurrent.ExecutionContext.Implicits.globalvalfuture1:Future[Int]=Future{//在这个计划中,执行它的线程会首先休眠3秒,然后返回一个Int值。Thread.sleep(3000)println("执行future1")4}valfuture2:Future[Int]=future1.map(p=>{//两秒后执行这个计算。Thread.sleep(2000)println("执行future2")p+5})

第一个异步计算会在3秒后执行,并返回一个Int值类型。在理想情况下,当第一个异步计算执行完毕后,它的下一步将是把刚才的返回值进行加操作。这个流程又被命名为future2。显然,它的计算返回值仍然是一个Future类型。

对于主线程而言,它将在大约5秒之后得到一个结果:9。

使用for表达式对Future做变换

Scala的for表达式功能要比Java强大得多,包括用于组合Future计算事件。

基于上述的异步运算future1和future2,我们创建第三个运算future3,对刚才的两个运算结果进行加和。代码清单如下:

//后续的代码端中将不再提醒导入执行上下文。importscala.concurrent.ExecutionContext.Implicits.global//观察开始时间println(newjava.util.Date())valfuture1:Future[Int]=Future{//在这个计划中,执行它的线程会首先休眠3秒,然后返回一个Int值。Thread.sleep(3000)println("执行future1")4}valfuture2:Future[Int]=Future{Thread.sleep(2000)println("执行future2")5}valfuture3:Future[Int]=for{x<-future1y<-future2}yield{x+y}//我们这里使用了Await等待结果调用完毕,不*等待时间。println(Await.result(future3,Duration.Inf))//观察结束时间println(newjava.util.Date())

for循环在底层实际上会将这段代码转换为串行化的flatmap句式:future1.faltMap(x=>future2.map(y=>x+y))。从主程序完成到完成计算,总共花费了3秒的时间(而不是5秒),因为上述的代码都是在异步的环境中执行完成的。

我们可以画出一个简单的PETRI图出来,并求出这个图的最短完成时间(详情参考离散数学科目:关键路径的相关知识)。

注意,如果使用for表达式对Future做变换,一定要将Future声明在for循环的前面,否则for表达式将在串行的环境下完成它们。

创建Success,Failure

Future提供诸多已经完成的future的工厂方法:successful,failed以及fromTry。这些方法不需要手动导入上下文。

使用successful方法来创建一个已经完成的future:

valfuture:Future[Int]=Future.successful({println("返回一个已经完成的Success[T]")100})//Some(Success(100))println(future.value)

使用failed方法创建一个已经完成,但是出现异常的future:

valfuture:Future[Nothing]=Future.failed({println("该方法用于返回一个Failure[T]")newException("Oops!")})//Some(Failure(java.lang.Exception:Oops!))println(future.value)

如果不确定抛出Try[+T]的哪一种情况,则调用fromTry:

valfuture:Future[Nothing]=Future.fromTry({println("可能返回Success或者Failure")//Success(100)Failure(newException("Oops!"))})println(future.value)两种等待方式Await同步等待

本文刚才所提到的Await是一种同步等待机制,主线程会在有限的时间内等待某个Future进行。

我们另引入一个包:Scala.concurrent.ration._,这样就允许我们使用2second这种方式来表示我们的最大等待时间了(笔者曾经在隐式转换章节中介绍过如何实现它)。

Await主要有两个方法。第一个用法是调用result另主线程进入阻塞等待,直到获取该future的返回值。

valintFuture=Future{println("正在计算...")println("执行此计算任务的线程是:"+Thread.currentThread().getName)Thread.sleep(1000)30}//主程序会在3秒内等待该结果,并赋值。valint:Int=Await.result(intFuture,3second)println(int)

一般用于需要获取到该future的返回值才能做进一步操作的情况,如果只关心该future的完成状态,可以调用ready方法。当future仍处于工作状态时,主线程会等待至多3秒。

Await.ready(intFuture,3second)

另外,通过Thread.currentThread().getName可以发现,此future是由另一个线程执行的:ForkJoinPool-X-worker-XX。

onComplete异步等待

忠告:如果你已经进入了Future空间内,就尽量不要再使用Await阻塞future的执行。Scala提供注册“回调函数”的方式来令你通过函数副作用获取到某个future在未来返回的值。

valintFuture=Future{println("正在计算...")println("执行此计算任务的线程是:"+Thread.currentThread().getName)Thread.sleep(1000)30}//Await.ready(intFuture,3second)//和刚才的情况不同,如果主线程不阻塞一会,那么这个程序会提前结束推出。Thread.sleep(3000)varintValue:Int=0intFutureonComplete{caseSuccess(value)=>println(value)//通过代码块副作用获取到这个Future的value返回值。intValue=valuecase_=>println("出现了意外的错误")}

这种方式不会阻塞主线程,为了能看到程序运行结果,我们需要主动调用Thread.sleep让主线程休眠一会,否则程序会立刻结束。onComplete的返回值是一个Unit数据类型。

使用andThen强制保证future的执行顺序

一个future可以绑定多个onComplete。然而,上下文环境并不会保证哪个future的onComplete会被率先触发,而andThen方法保证了回调函数的执行顺序。

importscala.concurrent.ExecutionContext.Implicits.globalvalintFuture=Future{Thread.sleep(2000)println(Thread.currentThread().getName)200}//主程序的onComplete方法的调用顺序不一定intFutureonComplete{caseSuccess(int)=>println(s"thisfuturereturned$int")case_=>println("somethingwronghashappened.")}intFutureonComplete{caseSuccess(int)=>println(s"completedwiththevalueof$int")case_=>println("somethingwronghashappened.")}Thread.sleep(3000)

执行上述的程序,控制台有可能先打印thisfuturereturned$int,也有可能先打印completedwiththevalueof$int。

importscala.concurrent.ExecutionContext.Implicits.globalvalintFuture=Future{Thread.sleep(2000)println(Thread.currentThread().getName)200}intFutureonComplete{caseSuccess(int)=>println(s"thisfuturereturned$int")case_=>println("somethingwronghashappened.")}intFutureandThen{caseSuccess(int)=>println(s"completedwiththevalueof$int")case_=>println("somethingwronghashappened.")}Thread.sleep(3000)

andThen方法会返回原future的一个镜像,并且只会在该future调用完onCompelete方法之后,andThen才会执行。

Promise

当我们不确定future何时会完成时,可以会借助Promise许下一个“承诺”,它表示:在某个未来的时间点,一定能够得到值。

valfuture=Future{//在这个计划中,执行它的线程会首先休眠3秒,然后返回一个Int值。Thread.sleep(3000)200}2

然而,这个Int值的计算实际上委托给了其它的future来完成。受托的Future在计算完结果之后会调用该promise的success方法来“兑现”这个承诺。

valfuture=Future{//在这个计划中,执行它的线程会首先休眠3秒,然后返回一个Int值。Thread.sleep(3000)200}3

考虑到异常情况,除了success方法,Promise还提供了failure,Complete等方法。无论调用哪种方法,一个Promise都只能被使用一次。

valfuture=Future{//在这个计划中,执行它的线程会首先休眠3秒,然后返回一个Int值。Thread.sleep(3000)200}4

随后此promise的future会进入就绪状态,我们使用刚才介绍的onComplete回调函数中"兑现"它的返回值。

valfuture=Future{//在这个计划中,执行它的线程会首先休眠3秒,然后返回一个Int值。Thread.sleep(3000)200}5

PromisedInt在这里充当着代理的作用。它承诺提供的值具体要由哪个future来计算并提供,程序的调用者可能并不关心:它也许是intFuture,也许是IntFuture2。因此,我们仅需要为代理(PromisedInt.future)设置回调函数,而不是其它的future。为了方便理解,这里给出连贯的代码清单:

importscala.concurrent.ExecutionContext.Implicits.globalvalfuture=Future{//在这个计划中,执行它的线程会首先休眠3秒,然后返回一个Int值。Thread.sleep(3000)200}2valintFuture=Future{println("正在计算...")println("执行此计算任务的线程是:"+Thread.currentThread().getName)Thread.sleep(1000)//promisedInt承诺的值由intFuture真正实现。promisedInt.success(300)promisedInt.failure(newException("可能的错误"))promisedInt.complete(Success(1))}//和刚才的情况不同,如果主线程不阻塞一会,那么这个程序会提前结束退出。Thread.sleep(3000)//主函数只关心promisedInt能否提供值。promisedInt.futureonComplete{caseSuccess(value)=>println(value)case_=>println("出现了意外的错误")}过滤Future的返回值

Scala提供两种方式让你对future的返回值进行检查,或者过滤。filter方法可以对future的结果进行检验。如果该值合法,就进行保留。下面的例子使用filter确保返回值是满足>=30的值。注意,执行filter方法之后得到的是另一个future值。

importscala.concurrent.ExecutionContext.Implicits.globalvaleventualInt=Future{Thread.sleep(3000)print(s"${Thread.currentThread().getName}:returnresult.")12}//检查返回值是否>=30.valcheckRes:Future[Int]=eventualIntfilter(_>=30)//阻塞等待while(!checkRes.isCompleted){Thread.sleep(1000)println("waiting..")}//注册回调。checkResonComplete{caseSuccess(res)=>println(s"result:$res")caseFailure(cause)=>println(s"failedbecauseof$cause")}

如果不满足匹配的要求,则它会返回java.util.NoSuchElementException:Future.filterpredicateisnotsatisfied。你可以在caseFailure(casue)=>中捕获它。

Future的collect方法允许你使用偏函数对结果进行中间变换,可以使用case语句对偏函数进行缩写。

valfuture=Future{//在这个计划中,执行它的线程会首先休眠3秒,然后返回一个Int值。Thread.sleep(3000)200}8处理失败的预期failed方法

Scala提供了几种处理失败的future的方式:包含failed,fallbackTo,recover和recoverWith。举例:如果某个future在执行时出现异常,则failed方法会返回一个成功的Future[Throwable]实例。

importscala.concurrent.ExecutionContext.Implicits.globalvalintFuture=Future{10/0}intFutureonComplete{caseSuccess(int)=>println(int)caseFailure(cause)=>println(s"failedbecauseof$cause")}valeventualThrowable:Future[Throwable]=intFuturefailed//Some(Success(java.lang.ArithmeticException:/byzero))println(eventualThrowable.value)

如果future是被正常执行的,则failed方法反而会抛出NoSuchElement。

fallbackTo方法

fallbackTo方法提供了保险机制,它允许原始的future失败时,转而去运行另一个future2。

//后续的代码端中将不再提醒导入执行上下文。importscala.concurrent.ExecutionContext.Implicits.globalvalfuture1:Future[Int]=Future{//在这个计划中,执行它的线程会首先休眠3秒,然后返回一个Int值。Thread.sleep(3000)println("执行future1")4}valfuture2:Future[Int]=future1.map(p=>{//两秒后执行这个计算。Thread.sleep(2000)println("执行future2")p+5})0

无论intFuture执行是否成功,maybeFailed也总是会运行(笔者亲测),因此不要在这里设置一些具有副作用的代码。当intFuture运行成功时,maybeFailed的返回值将被会忽略,它实际返回的是intFuture的返回值。在intFuture运行失败的情况下,maybeFailed方法的返回值才会生效。

如果maybeFailed在执行时也出现了异常,则它抛出的异常将被忽略,捕捉到的将是上一级intFuture的原始异常。

recover方法

另一个方法recover和fallbackTo方法的逻辑类似:如果它捕获到了异常,则允许你根据异常的类型采取对应的策略并返回值。但是如果调用它的原始future执行成功,则这个备用值同样会被忽略。同理,传入recover的偏函数中如果没有对指定异常的处理,则原始future的异常会被透传。

importscala.concurrent.ExecutionContext.Implicits.globalvalintFuture=Future{10/0}valeventualInt:Future[Int]=intFuturerecover{caseex:ArithmeticException=>100caseex:Exception=>200}intFutureonComplete{caseSuccess(int)=>println(int)caseFailure(cause)=>println(s"failedbecauseof$cause")}Thread.sleep(3000)pr
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
草青青,青青草,草上接谢珍珠宝,怕日晒怕风摇,摇看珍珠得起早 谜底是... 一加9R要不要升级ColorOS 13正式版 一加9pro怎么coloros12一加9pro升级coloros12的方法 coloros12支持哪些一加机型?coloros12支持一加机型介绍 一加9pro如何coloros12?一加9pro升级coloros12的方法 一加9pro升级coloros12拍照改善吗 我是一个高中生,职教的,我们班上有5个女生,我喜欢有一个,但追她又有... 自动挡d挡旁边的 -是什么意思? 自动挡位上的加减是什么意思? 宣传这个职位是干什么的 并发异步编程之争:协程(asyncio)到底需不需要加锁?(线程/协程安全/挂起... Golang异步编程,快速理解Goroutines通信、各种锁的使用 如何用社群批量卖货?案例步骤拆解 你是怎样制作出一份出色文案的? 文案销售做到哪四步,你的产品就卖爆呢? 农历2o15年2月16曰是合结婚的曰子吗? 高考励志演讲第一人是房善朝吗?网上有没有房善朝高考励志演讲清晰完整的... ...我想申请助学贷款,但是没有共同借款人怎么办? 哪些人物的演讲最具吸引力? 什么网站的学习资源最多最好最有用? 晚上可以学习的网站有哪些推荐? 支付宝芝麻信用贷款的条件是什么 芝麻信用分能什么贷款吗 自家做面包的方法 买了一盒拨云复光散,打开后里面有一瓶眼药水和一袋粉末? 都是这样的... 河南省中医皮肤研究院骗人的!~根本没有这个药~!什么百草克银丸也是, 天津哪几个茶馆的相声表演最值得一看? 单招没填志愿有补录机会吗 感冒了有火引起的鼻子不通气嗓子难受吃感康一点效果没... 感冒生病有一两个月了,昨天买感康感冒药吃,一下子吃了三粒!后来看药... C# 异步中WaitOne()的第二个布尔类型的参数表示什么? wait和await有什么不同? 猫盒子没有坏。但是老是吊线 高手告诉下 一道数学题呀右面是一盒药里面的药品使用说明,李阿姨的女儿今年3岁,体 ... 刚才说话的直播间在哪呢 抖音直播跟播互动公屏怎么托品?互动方式有哪些? 捷达vs7上市时间? 最新款捷达捷达Vs7首付需要多少万,每月还款多少合适呢? ...和你在一起.我宁愿我的星光全部陨落,因为你的眼睛是我生命里最亮的... 关于动漫Clannad两个世界的问题 动漫Angel Beats!剧情 求一个动漫是后宫的,讲一群人都死了在死后的世界里大概是这个样子名字... ...讲的是人死亡之后心里有结解不开就会去到另一个地方 鼻咽癌鼻出血严重吗 鼻咽癌鼻子出血吗 苹果手机刷机後被指定了设置过得ID来激活,但是忘记原本申请ID的邮箱密... 捡到一部苹果手机被刷机之后需要激活锁 怎么破解呢? 免洗洗手液的坏处 使用免洗洗手液要注意什么 郭美美自曝是他前女友,还有两人合照,继张一山后黄景瑜也翻车了吗? 郭美美曝与黄景瑜交往细节后,他频繁上线9次而不作回应,他在心虚...