本篇內容介紹了“Scala Actor多線程怎么理解”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
Scala Actor是Scala里多線程的基礎,核心思想是用消息傳遞來進行線程間的信息共享和同步。
Scala Actor線程模型可以這樣理解:所有Actor共享一個線程池,總的線程個數可以配置,也可以根據CPU個數決定;當一個Actor啟動之后,Scala分配一個線程給它使用,如果使用receive模型,這個線程就一直為該Actor所有,如果使用react模型,Scala執行完react方法后拋出異常,則該線程就可以被其它Actor使用。
下面看一些核心代碼。
def start(): Actor = synchronized { // Reset various flags. // // Note that we do *not* reset `trapExit`. The reason is that // users should be able to set the field in the constructor // and before `act` is called. exitReason = 'normal exiting = false shouldExit = false scheduler execute { ActorGC.newActor(Actor.this) (new Reaction(Actor.this)).run() } this }
其中Reaction實現Runnable接口,scheduler基本相當于是一個線程池,所以調用start方法之后會有一個線程來為該Actor服務。
使用receive模型。
def receive[R](f: PartialFunction[Any, R]): R = { assert(Actor.self == this, "receive from channel belonging to other actor") this.synchronized { if (shouldExit) exit() // links val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) if (null eq qel) { waitingFor = f.isDefinedAt isSuspended = true suspendActor() } else { received = Some(qel.msg) sessions = qel.session :: sessions } waitingFor = waitingForNone isSuspended = false } val result = f(received.get) sessions = sessions.tail result
如果當前mailbox里面沒有可以處理的消息,調用suspendActor,該方法會調用wait;如果有消息,這調用PartialFunction進行處理。
使用react模型。
def react(f: PartialFunction[Any, Unit]): Nothing = { assert(Actor.self == this, "react on channel belonging to other actor") this.synchronized { if (shouldExit) exit() // links val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) if (null eq qel) { waitingFor = f.isDefinedAt continuation = f isDetached = true } else { sessions = List(qel.session) scheduleActor(f, qel.msg) } throw new SuspendActorException }
如果當前mailbox沒有可以處理的消息,設置waitingFor和continuation,這兩個變量會在接收到消息的時候使用;如果有消息,則調用scheduleActor,該方法會在線程池里選擇一個新的線程來處理,具體的處理方法也是由PartialFunction決定。不管是哪條路徑,react都會立即返回,或者說是立即拋出異常,結束該線程的執行,這樣該線程就可以被其它Actor使用。
再來看看接收消息的處理代碼。
def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized { if (waitingFor(msg)) { received = Some(msg) if (isSuspended) sessions = replyTo :: sessions else sessions = List(replyTo) waitingFor = waitingForNone if (!onTimeout.isEmpty) { onTimeout.get.cancel() onTimeout = None } if (isSuspended) resumeActor() else // assert continuation != null scheduler.execute(new Reaction(this, continuation, msg)) } else { mailbox.append(msg, replyTo) }
如果當前沒有在等待消息或者接收到的消息不能處理,就丟到mailbox里去;相反,則進行消息的處理。這里對于receive模型和react模型就有了分支:如果isSuspended為true,表示是receive模型,并且線程在wait,就調用resumeActor,該方法會調用notify;否則就是react模型,同樣在線程池里選擇一個線程進行處理。
“Scala Actor多線程怎么理解”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。