ScalaActor并发编程模型 发表于 2023-06-13 更新于 2023-06-13
字数总计: 3.7k 阅读时长: 15分钟 阅读量: 南京
Scala Actor 并发编程模型 Actor
并发编程模型,是 Scala 提供的一直与 Java 完全不一样的并发编程模型,是一直基于事件模型的并发机制。Actor
并发编程模型是一种不共享数据,依赖消息传递的并发编程模型,有效避免了资源争夺、死锁等现象。
Actor
是一种基于事件(消息)的并发编程模型,不共享数据,有效避免了共享数据加锁问题。
Java并发编程对比 Actor 并发编程模型
Java并发编程
Actor
并发编程模型
共享数据锁模型(share data and lock)
share nothing
每个 object 都有一个monitor
,用来监视对共享数据的访问
不共享数据,Actor
直接通过Message
通讯
加锁代码使用synchronized
标识
死锁问题
每个Actor
内部是顺序执行的
每个线程内部是顺序执行的
每个Actor
内部是顺序执行的
Scala 在 2.11 及之后的版本中加入了Akka
并发编程框架,Actor
并发编程模型已经被废弃了。
创建 Actor 可以通过类(Class)或者单例对象(Object)继承Actor
特质的方式来创建Actor
对象
通过类实现创建Actor
对象 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class myActor1 extends Actor { override def act (): Unit = { for (i <- 1 to 10 ) println("myActor1---" + i) } } class myActor2 extends Actor { override def act (): Unit = { for (i <- 11 to 20 ) println("myActor2---" + i) } } def main (args: Array [String ]): Unit = { val myActor1: myActor1 = new myActor1() myActor1.start() new myActor2().start() }
通过单例对象实现创建Actor
对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 object myActor1 extends Actor { override def act (): Unit = { for (i <- 1 to 10 ) println("myActor1---" + i) } } object myActor2 extends Actor { override def act (): Unit = { for (i <- 11 to 20 ) println("myActor2---" + i) } } def main (args: Array [String ]): Unit = { myActor1.start() myActor2.start() }
发送以及接收消息 发送消息
!
: 发送异步消息,没有返回值
!?
: 发送同步消息,等待返回值
!!
: 发送异步消息,返回值是 Future[Any]
如下给myActor1
发送一个异步字符串消息
接收消息 Actor
中使用receive
方法来接收消息,需要传入一个偏函数receive
方法值接收一次消息,接收完成后进行执行act()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 object ActorSender extends Actor { override def act (): Unit = { ActorReceiver ! "你好" } } object ActorReceiver extends Actor { override def act (): Unit = { receive { case msg: String => println(msg) } } } def main (args: Array [String ]): Unit = { ActorSender .start() ActorReceiver .start() }
持续发送和接收消息 用 while
循环来持续不断的发送和接收消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 object ActorSender extends Actor { override def act (): Unit = { while (true ){ ActorReceiver ! "你好" Thread sleep 1000 } } } object ActorReceiver extends Actor { override def act (): Unit = { while (true ){ receive { case msg: String => println(msg) } } } } def main (args: Array [String ]): Unit = { ActorSender .start() ActorReceiver .start() }
问题 :
如果当前Actor
没有接收到消息,线程就会处于阻塞状态。如果很多的Actor
,就会导致很多线程处于阻塞状态
每次有新的消息进来,都会重新创建新的线程来处理。这种频繁的线程创建、销毁和切换会影响影响效率
解决办法 : 通过loop()
结合react()
来复用多线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 object ActorSender extends Actor { override def act (): Unit = { loop { ActorReceiver ! "你好" Thread sleep 1000 } } } object ActorReceiver extends Actor { override def act (): Unit = { loop { react { case msg: String => println(msg) } } } } def main (args: Array [String ]): Unit = { ActorSender .start() ActorReceiver .start() }
发送和接收自定义消息 在此之前我们发送的消息都是字符串类型的,显然这样并不常见,因此我们需要能够自定义发送的消息类型。例如可以用样例类
封装消息,然后进行发送处理
发送接收同步有返回消息
使用!?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 case class Message (id: Int , message: String )case class ReplyMessage (message: String , name: String )object MessageActor extends Actor { override def act (): Unit = { loop { react { case Message (id: Int , message: String ) => println(s"id = $id ,message = $message " ) sender ! ReplyMessage ("你也好" ,"MessageActor" ) } } } } def main (args: Array [String ]): Unit = { MessageActor .start() val res: Any = MessageActor !? Message (1 , "hello" ) val replyMessage: ReplyMessage = res.asInstanceOf[ReplyMessage ] println(s"MainActor接收到MessageActor返回的消息是: ${replyMessage.message} and ${replyMessage.name} " ) }
发送异步无返回消息
使用!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 object MessageActor extends Actor { override def act (): Unit = { loop { react { case Message (id: Int , message: String ) => println(s"id = $id ,message = $message " ) } } } } def main (args: Array [String ]): Unit = { MessageActor .start() MessageActor ! Message (1 , "hello" ) }
发送接收异步有返回消息
使用!!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 object MessageActor extends Actor { override def act (): Unit = { loop { react { case Message (id: Int , message: String ) => println(s"id = $id ,message = $message " ) sender ! ReplyMessage ("你也好" , "MessageActor" ) } } } } } def main (args: Array [String ]): Unit = { MessageActor .start() MessageActor !! Message (1 , "hello" ) println("继续执行..." ) val replyMessage: ReplyMessage = res.asInstanceOf[ReplyMessage ] println(s"MainActor接收到MessageActor返回的消息是: ${replyMessage.message} from ${replyMessage.name} " ) println("MainActor接收成功..." ) }
同步消息和异步消息区别 同步消息
: 必须接收到回复信息,程序才会继续执行异步消息
: 即使没有接收到回复信息,程序也会继续执行
Actor 实现 WordCount 案例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 package com.chongyan.wordcountimport java.io.File import scala.actors.{Actor , Future }import scala.io.Source object MainActor { def main (args: Array [String ]): Unit = { var dir = "./data/" var fileNameList = new File (dir).list().toList val fileDirList: List [String ] = fileNameList.map(dir + _) case class WordCountTask (fileName: String ) case class WordCountResult (WordCountList : List [(String , Int )]) class WordCountActor extends Actor { override def act (): Unit = { loop { react { case WordCountTask (fileName) => println(s"获取到的任务是 $fileName " ) val linesList: List [String ] = Source .fromFile(fileName).getLines().toList val wordsList: List [String ] = linesList.flatMap(_.split(" " )) val wordsTimesList: List [(String , Int )] = wordsList.map((_, 1 )) val wordsCountList: Map [String , List [(String , Int )]] = wordsTimesList.groupBy(_._1) val WordCountList : List [(String , Int )] = wordsCountList.map { wordsCountMap => (wordsCountMap._1, wordsCountMap._2.map(_._2).sum) }.toList println(WordCountList ) sender ! WordCountResult (WordCountList ) } } } } val wordCountActorsList: List [WordCountActor ] = fileNameList.map(_ => new WordCountActor ) val actorWithFile: List [(WordCountActor , String )] = wordCountActorsList.zip(fileDirList) val fatureList: List [Future [Any ]] = actorWithFile.map { actorAndFile => val actor: WordCountActor = actorAndFile._1 val fileName: String = actorAndFile._2 actor.start() val fature: Future [Any ] = actor !! WordCountTask (fileName) fature } while (fatureList.exists(!_.isSet)) {} val WordCountLists : List [List [(String , Int )]] = fatureList .map(_.apply() .asInstanceOf[WordCountResult ] .WordCountList ) val WordCountList : List [(String , Int )] = WordCountLists .flatten.groupBy(_._1).map { wordsCountMap => (wordsCountMap._1, wordsCountMap._2.map(_._2).sum) }.toList println(WordCountList ) } }
Scala Akka 并发编程框架 什么是 Akka
? Akka
是一个用于构建高并发、分布式和可扩展的基于事件驱动的应用程序工具包。Akka
是使用 Scala 开发的库,可以支持 Scala 和 Java 语言来开发基于 Akka
的应用程序。
Akka
的特性
通过基于异步非阻塞、高性能的事件驱动编程模型
内置容错机制,是循序Actor
在出错是进行恢复或者重置操作
超级轻量级的事件处理(每 GB 对内存可以运行几百万Actor
)
使用Akka
可以在单机上构建高并发程序,也可以在网络中构建分布式程序
Akka
通讯过程
学生创建一个ActorSystem
通过ActorSystem
来创建一个ActorRef
(老师的引用),并将消息发送给ActorRef
ActorRef
将消息发送给Message Dispatcher
(消息分发器)
Message Dispatcher
将消息按照顺序保存到目标Actor的MailBox中
Message Dispatcher
将MailBox
放到一个线程中
MailBox
按照顺序取出消息,最终将它递给TeacherActor
接受的方法中
创建Actor
Akka
中,也是基于Actor
来进行编程的。类似于Actor
。但是Akka
中的Actor
的编写、创建方法和之前有一些不一样。
API介绍
ActorSystem
: 它负责创建和监督Actor
在Akka
中,ActorSystem
是一个重量级的结构,它需要分配多个线程
在实际应用中, ActorSystem
通常是一个单例对象, 可以使用它创建很多Actor
直接使用context.system
就可以获取到管理该Actor
的ActorSystem
的引用
定义类或者单例对象继承Actor
(注意: 要导入akka.actor包下的Actor)
实现receive
方法,receive
方法中直接处理消息 即可,不需要添加loop
和react
方法调用. Akka会自动调用receive来接收消息
还可以实现preStart()
方法, 该方法在Actor
对象构建后执行,在Actor
生命周期中仅执行一次.
要创建Akka的Actor
,必须要先获取创建一个ActorSystem
。需要给ActorSystem
指定一个名称,并可以去加载一些配置项
调用ActorSystem.actorOf(Props(Actor对象), "Actor名字")
来加载Actor
Actor Path 每一个Actor
都有一个Path
,这个路径可以被外部引用。路径的格式如下:
Actor类型
路径
示例
本地Actor
akka://actorSystem名称/user/Actor名称
akka://SimpleAkkaDemo/user/senderActor
远程Actor
akka.tcp://my-sys@ip地址:port/user/Actor名称
akka.tcp://192.168.10.17:5678/user/service-b
创建实例
定义SenderActor
类
定义ReceiverActor
类
定义Entrance
主运行类
1 2 3 4 5 6 7 8 9 10 11 import akka.actor.Actor object SenderActor extends Actor { override def receive : Receive = { case x => println(x) } }
1 2 3 4 5 6 7 8 import akka.actor.Actor object ReceiverActor extends Actor { override def receive : Receive = { case x => println(x) } }
1 2 3 4 5 6 7 8 object Entrance { def main (args: Array [String ]): Unit = { val actorSystem: ActorSystem = ActorSystem ("actorSystem" , ConfigFactory .load()) val senderActor: ActorRef = actorSystem.actorOf(Props (SenderActor ), "senderActor" ) val receiverActor: ActorRef = actorSystem.actorOf(Props (ReceiverActor ), "receiverActor" ) } }
发送和接收消息
使用样例类封装消息
SubmitTaskMessage
提交任务消息
SuccessSubmitTaskMessage
任务提交成功消息
使用!
发送消息
SenderActor.scala
1 2 3 4 5 6 7 8 9 10 11 12 13 object SenderActor extends Actor { override def receive : Receive = { case "start" => val receiverActorSelection: ActorSelection = context.actorSelection("akka://actorSystem/user/receiverActor" ) receiverActorSelection ! SubmitTaskMessage ("我是 SenderActor ,我在给你发消息" ) case SuccessSubmitTaskMessage (msg) => println(s"SenderActor 接收到的消息是,$msg " ) } }
ReceiverActor.scala
1 2 3 4 5 6 7 object ReceiverActor extends Actor { override def receive : Receive = { case SubmitTaskMessage (msg) => println(s"我是 ReceiverActor ,我接收到的消息是:$msg " ) sender ! SuccessSubmitTaskMessage ("我是 ReceiverActor ,接收成功" ) } }
Entrance.scala
1 2 3 4 5 6 7 8 object Entrance { def main (args: Array [String ]): Unit = { val actorSystem: ActorSystem = ActorSystem ("actorSystem" , ConfigFactory .load()) val senderActor: ActorRef = actorSystem.actorOf(Props (SenderActor ), "senderActor" ) val receiverActor: ActorRef = actorSystem.actorOf(Props (ReceiverActor ), "receiverActor" ) senderActor ! "start" } }
Akka
定时任务通过 ActorSystem.scheduler.schedule()方法
, 启动定时任务
1 2 3 4 5 6 final def schedule ( initialDelay : FiniteDuration , interval : FiniteDuration , receiver : ActorRef , message : Any ) (implicit executor : ExecutionContext , sender : ActorRef = {})
1 2 3 4 5 final def schedule ( initialDelay : FiniteDuration , interval : FiniteDuration )(f : => Unit ) (implicit executor : ExecutionContext )
具体实现代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 object MainActor { object ReceiverActor extends Actor { override def receive : Receive = { case msg => println(msg) } } def main (args: Array [String ]): Unit = { val actorSystem: ActorSystem = ActorSystem ("actorSystem" , ConfigFactory .load()) val receiverActor: ActorRef = actorSystem.actorOf(Props (ReceiverActor ), "receiverActor" ) import actorSystem.dispatcher import scala.concurrent.duration._ actorSystem.scheduler.schedule(0 seconds, 2 seconds, receiverActor, "Hello ReceiverActor!, 方式 1..." ) actorSystem.scheduler.schedule(0 seconds, 2 seconds) { receiverActor ! "Hello ReceiverActor!, 方式 2..." } } }
实现两个进程间的通信 基于Akka
实现两个进程 之间发送、接收消息。
WorkerActor
启动后去连接MasterActor
,并发送消息给MasterActor
WorkerActor
在接收到消息后,再回复消息给MasterActor
MasterActor.scala
1 2 3 4 5 6 7 8 9 10 11 12 package com.chongyan.masterAndWorker.masterimport akka.actor.Actor object MasterActor extends Actor { override def receive : Receive = { case "setup" => println("MasterActor started!" ) case "connect" => println("MasterActor, received: connect!" ) sender ! "success" } }
MasterEntrance.scala
1 2 3 4 5 6 7 8 9 10 11 package com.chongyan.masterAndWorker.masterimport akka.actor.{ActorRef , ActorSystem , Props }import com.typesafe.config.ConfigFactory object MasterEntrance { def main (args: Array [String ]): Unit = { val actorSystem: ActorSystem = ActorSystem ("actorSystem" , ConfigFactory .load()) val masterActor: ActorRef = actorSystem.actorOf(Props (MasterActor ), "masterActor" ) masterActor ! "setup" } }
WorkerActor.scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 package com.chongyan.masterAndWorker.workerimport akka.actor.{Actor , ActorSelection }object WorkerActor extends Actor { override def receive : Receive = { case "setup" => println("WorkerActor started!" ) val masterActor: ActorSelection = context .system .actorSelection("akka.tcp://actorSystem@127.0.0.1:8888/user/masterActor" ) masterActor ! "connect" case "success" => println("MasterActor, received: success!" ) } }
WorkerEntrance.scala
1 2 3 4 5 6 7 8 9 10 11 package com.chongyan.masterAndWorker.workerimport akka.actor.{ActorRef , ActorSystem , Props }import com.typesafe.config.ConfigFactory object WorkerEntrance { def main (args: Array [String ]): Unit = { val actorSystem: ActorSystem = ActorSystem ("actorSystem" , ConfigFactory .load()) val workerActor: ActorRef = actorSystem.actorOf(Props (WorkerActor ), "workerActor" ) workerActor ! "setup" } }