博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
akka actors默认邮箱介绍
阅读量:6827 次
发布时间:2019-06-26

本文共 4097 字,大约阅读时间需要 13 分钟。

 1. UnboundedMailbox is the default unbounded MailboxType used by Akka Actors

 ”无界邮箱“ 是akka actors默认使用的邮箱,

UnboundedMailbox继承了MailboxType

/** * MailboxType is a factory to create MessageQueues for an optionally * provided ActorContext. * * Possibly Important Notice * * When implementing a custom mailbox type, be aware that there is special * semantics attached to `system.actorOf()` in that sending to the returned * ActorRef may—for a short period of time—enqueue the messages first in a * dummy queue. Top-level actors are created in two steps, and only after the * guardian actor has performed that second step will all previously sent * messages be transferred from the dummy queue into the real mailbox. */trait MailboxType {  def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue}

 和ProducesMessageQueue

trait ProducesMessageQueue[T <: MessageQueue]

 MailboxType特质的create方法接受两个参数owner和system,owner表示“消息队列”的所有者,system表示 ???

UnboundedMailbox的定义如下:

/** * UnboundedMailbox is the default unbounded MailboxType used by Akka Actors. */case class UnboundedMailbox() extends MailboxType with ProducesMessageQueue[UnboundedMailbox.MessageQueue] {  // 使用this关键字来调用构造函数。  // 它演示了如何从其他构造函数调用构造函数。必须确保this必须放在构造函数中的第一个语句,同时调用其他构造函数this(),否则编译器会抛出错误。  // “=”等号右边调用的this()是指向的UnboundedMailBox()类  def this(settings: ActorSystem.Settings, config: Config) = this()  // 重写MailboxType的create()方法,并创建UnboundedMailbox.MessgaeQueue对象  final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =    new UnboundedMailbox.MessageQueue}

 UnboundedMailbox中的create()方法中创建的UnboundedMailbox.MessgaeQueue对象是

object UnboundedMailbox {  class MessageQueue extends ConcurrentLinkedQueue[Envelope] with UnboundedQueueBasedMessageQueue {    final def queue: Queue[Envelope] = this  }}  

   MessageQueue继承了ConCurrentLinkedQueue,并且ConCurrentLinkedQueue是存放的Envelope

final case class Envelope private (val message: Any, val sender: ActorRef)

(信封)对象,信封对象中包含一个message消息和发送者sender的ActorRef。

此外,MessageQueue还with了一个UnboundedQueueBasedMessageQueue

/** * BoundedMessageQueueSemantics adds bounded semantics to a QueueBasedMessageQueue, * i.e. blocking enqueue with timeout. */trait BoundedMessageQueueSemantics {  def pushTimeOut: Duration}trait UnboundedQueueBasedMessageQueue extends QueueBasedMessageQueue with UnboundedMessageQueueSemantics {  // 入队,这里是入队了一个信封,貌似receiver没有用到?不明白为什么要有这个receiver,可能是为了明显标记是发给那个Actor的吧  def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle  // 出队,出队就是从队列中取出一个元素(一封信)  def dequeue(): Envelope = queue.poll()}

  UnboundedQueueBasedMessageQueue中实现了从QueueBasedMessageQueue继承来的enqueue()和dequeue()方法。更确切的话是从MessageQueue继承来的,因为QueueBasedMessageQueue是继承了MessageQueue,代码如下:

/** * This is a marker trait for message queues which support multiple consumers, * as is required by the BalancingDispatcher. */trait MultipleConsumerSemantics/** * A QueueBasedMessageQueue is a MessageQueue backed by a java.util.Queue. */trait QueueBasedMessageQueue extends MessageQueue with MultipleConsumerSemantics {  def queue: Queue[Envelope]  // 消息的数量 (队列的元素个数)  def numberOfMessages = queue.size  // 是否包含消息 (队列是否为空)  def hasMessages = !queue.isEmpty  // 清空消息 (清空队列)  def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {    if (hasMessages) {      // 获取第一个消息      var envelope = dequeue      while (envelope ne null) {        // 如果消息不为空,则不断向deadLetters发送消息        // deadLetters也是一个消息队列actor.dispatcher.mailboxes.deadLetterMailbox.messageQueue        deadLetters.enqueue(owner, envelope)        envelope = dequeue      }    }  }}

  从QueueBasedMessageQueue的定义来看,其中包含了几个重要的方法。还定义了一个queue,回顾一下UnboundedMailbox的定义

object UnboundedMailbox {  class MessageQueue extends ConcurrentLinkedQueue[Envelope] with UnboundedQueueBasedMessageQueue {    final def queue: Queue[Envelope] = this  }}  

   其中的queue是重写了QueueBasedMessageQueue中定义的queue,def queue: Queue[Envelope]。

 

3. 邮箱是akka里面是怎么使用的? 

如果我们是使用的ActorRef去创建一个actor,例如:

val testActor: ActorRef = context.actorOf(      TestActor.props().withDispatcher("test-actor-dispatcher"),      TestActor.Name)

   

转载地址:http://lcgzl.baihongyu.com/

你可能感兴趣的文章
Struts2学习(六):访问隐藏的request和session
查看>>
结合项目实例 回顾传统设计模式(四)工厂模式(简单工厂、普通工厂、抽象工厂)...
查看>>
我了解的西安软件外包业务
查看>>
第57期:LPWAN技术之超窄带(UNB)浅析
查看>>
Frankfan7你问我答之一
查看>>
Windows XP启用ADMIN$默认管理共享
查看>>
RDIFramework.NET V2.9版本多语言的实现
查看>>
新内核的编译安装
查看>>
Azure实例级公共IP
查看>>
停电,导致DC无法启动
查看>>
MariaDB七之双主复制
查看>>
Sencha touch实践(1)在ios,android上变web app为native app
查看>>
XNA游戏:重力感应
查看>>
DNS服务部署的那点事儿
查看>>
【云计算的1024种玩法】使用 MSMTP 实现底层环境的 阿里云·邮件推送服务 兼容...
查看>>
Varnish介绍,安装与配置详解。
查看>>
CentOS bash漏洞威胁恐比“心脏流血”更大
查看>>
LINUX总结
查看>>
SCOM 2016监控IIS 网页服务
查看>>
通用权限管理系统组件 (GPM - General Permissions Manager) 中最简单的例子程序,如何时间通讯录管理...
查看>>