1. UnboundedMailbox is the default unbounded MailboxType used by Akka Actors
/** * MailboxType is a factory to create MessageQueues for an optionally * provided ActorContext. * * Possibly Important Notice * * When implementing a custom mailbox type, be aware that there is special * semantics attached to `system.actorOf()` in that sending to the returned * ActorRef may—for a short period of time—enqueue the messages first in a * dummy queue. Top-level actors are created in two steps, and only after the * guardian actor has performed that second step will all previously sent * messages be transferred from the dummy queue into the real mailbox. */trait MailboxType { def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue}
trait ProducesMessageQueue[T <: MessageQueue]
MailboxType特质的create方法接受两个参数owner和system,owner表示“消息队列”的所有者,system表示 ???
/** * UnboundedMailbox is the default unbounded MailboxType used by Akka Actors. */case class UnboundedMailbox() extends MailboxType with ProducesMessageQueue[UnboundedMailbox.MessageQueue] { // 使用this关键字来调用构造函数。 // 它演示了如何从其他构造函数调用构造函数。必须确保this必须放在构造函数中的第一个语句,同时调用其他构造函数this(),否则编译器会抛出错误。 // “=”等号右边调用的this()是指向的UnboundedMailBox()类 def this(settings: ActorSystem.Settings, config: Config) = this() // 重写MailboxType的create()方法,并创建UnboundedMailbox.MessgaeQueue对象 final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new UnboundedMailbox.MessageQueue}
object UnboundedMailbox { class MessageQueue extends ConcurrentLinkedQueue[Envelope] with UnboundedQueueBasedMessageQueue { final def queue: Queue[Envelope] = this }}
final case class Envelope private (val message: Any, val sender: ActorRef)
/** * BoundedMessageQueueSemantics adds bounded semantics to a QueueBasedMessageQueue, * i.e. blocking enqueue with timeout. */trait BoundedMessageQueueSemantics { def pushTimeOut: Duration}trait UnboundedQueueBasedMessageQueue extends QueueBasedMessageQueue with UnboundedMessageQueueSemantics { // 入队,这里是入队了一个信封,貌似receiver没有用到?不明白为什么要有这个receiver,可能是为了明显标记是发给那个Actor的吧 def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle // 出队,出队就是从队列中取出一个元素(一封信) def dequeue(): Envelope = queue.poll()}
/** * This is a marker trait for message queues which support multiple consumers, * as is required by the BalancingDispatcher. */trait MultipleConsumerSemantics/** * A QueueBasedMessageQueue is a MessageQueue backed by a java.util.Queue. */trait QueueBasedMessageQueue extends MessageQueue with MultipleConsumerSemantics { def queue: Queue[Envelope] // 消息的数量 (队列的元素个数) def numberOfMessages = queue.size // 是否包含消息 (队列是否为空) def hasMessages = !queue.isEmpty // 清空消息 (清空队列) def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = { if (hasMessages) { // 获取第一个消息 var envelope = dequeue while (envelope ne null) { // 如果消息不为空,则不断向deadLetters发送消息 // deadLetters也是一个消息队列actor.dispatcher.mailboxes.deadLetterMailbox.messageQueue deadLetters.enqueue(owner, envelope) envelope = dequeue } } }}
其中的queue是重写了QueueBasedMessageQueue中定义的queue,def queue: Queue[Envelope]。
3. 邮箱是akka里面是怎么使用的?
val testActor: ActorRef = context.actorOf( TestActor.props().withDispatcher("test-actor-dispatcher"), TestActor.Name)