package scala.actors import java.util.{TimerTask} /** * Extends the [[scala.actors.Reactor]] * trait with methods to reply to the sender of a message. * Sending a message to a <code>ReplyReactor</code> implicitly * passes a reference to the sender together with the message. * * @author Philipp Haller * * @define actor `ReplyReactor` */ trait InternalReplyReactor extends Reactor[Any] with ReactorCanReply { /* A list of the current senders. The head of the list is * the sender of the message that was received last. */ @volatile private[actors] var senders: List[OutputChannel[Any]] = List() /* This option holds a TimerTask when the actor waits in a * reactWithin. The TimerTask is cancelled when the actor * resumes. * * guarded by this */ private[actors] var onTimeout: Option[TimerTask] = None /** * Returns the $actor which sent the last received message. */ protected[actors] def internalSender: OutputChannel[Any] = senders.head /** * Replies with <code>msg</code> to the sender. */ protected[actors] def reply(msg: Any) { internalSender ! msg } override def !(msg: Any) { send(msg, Actor.rawSelf(scheduler)) } override def forward(msg: Any) { send(msg, Actor.sender) } private[actors] override def resumeReceiver(item: (Any, OutputChannel[Any]), handler: PartialFunction[Any, Any], onSameThread: Boolean) { synchronized { if (!onTimeout.isEmpty) { onTimeout.get.cancel() onTimeout = None } } senders = List(item._2) super.resumeReceiver(item, handler, onSameThread) } private[actors] override def searchMailbox(startMbox: MQueue[Any], handler: PartialFunction[Any, Any], resumeOnSameThread: Boolean) { var tmpMbox = startMbox var done = false while (!done) { val qel = tmpMbox.extractFirst((msg: Any, replyTo: OutputChannel[Any]) => { senders = List(replyTo) handler.isDefinedAt(msg) }) if (tmpMbox ne mailbox) tmpMbox.foreach((m, s) => mailbox.append(m, s)) if (null eq qel) { synchronized { // in mean time new stuff might have arrived if (!sendBuffer.isEmpty) { tmpMbox = new MQueue[Any]("Temp") drainSendBuffer(tmpMbox) // keep going } else { waitingFor = handler // see Reactor.searchMailbox throw Actor.suspendException } } } else { resumeReceiver((qel.msg, qel.session), handler, resumeOnSameThread) done = true } } } private[actors] override def makeReaction(fun: () => Unit, handler: PartialFunction[Any, Any], msg: Any): Runnable = new ReplyReactorTask(this, fun, handler, msg) protected[actors] override def react(handler: PartialFunction[Any, Unit]): Nothing = { assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor") super.react(handler) } /** * Receives a message from this $actor's mailbox within a certain * time span. * * This method never returns. Therefore, the rest of the computation * has to be contained in the actions of the partial function. * * @param msec the time span before timeout * @param handler a partial function with message patterns and actions */ protected[actors] def reactWithin(msec: Long)(handler: PartialFunction[Any, Unit]): Nothing = { assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor") synchronized { drainSendBuffer(mailbox) } // first, remove spurious TIMEOUT message from mailbox if any mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => m == TIMEOUT) while (true) { val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => { senders = List(replyTo) handler isDefinedAt m }) if (null eq qel) { synchronized { // in mean time new messages might have arrived if (!sendBuffer.isEmpty) { drainSendBuffer(mailbox) // keep going } else if (msec == 0L) { // throws Actor.suspendException resumeReceiver((TIMEOUT, this), handler, false) } else { waitingFor = handler val thisActor = this onTimeout = Some(new TimerTask { def run() { thisActor.send(TIMEOUT, thisActor) } }) Actor.timer.schedule(onTimeout.get, msec) throw Actor.suspendException } } } else resumeReceiver((qel.msg, qel.session), handler, false) } throw Actor.suspendException } override def getState: Actor.State.Value = synchronized { if (waitingFor ne Reactor.waitingForNone) { if (onTimeout.isEmpty) Actor.State.Suspended else Actor.State.TimedSuspended } else _state } }