This page is no longer maintained — Please continue to the home page at www.scala-lang.org

Again: using actors to schedule things in time

4 replies
Sciss
Joined: 2008-12-17,
User offline. Last seen 28 weeks 5 days ago.

hi,

i'm still trying to figure out how to create an elegant (timed)
scheduling system using scala actors. eventually i would love to be
able to write something like this:

routine {
var count = 0
loopWhile( count < 20 ) {
println( "Count is " + count ); count = count + 1
sleep( 666 )
}
}

i have come to this result:

import scala.actors.Actor
import scala.actors.Actor._

case class Reschedule( millis: Long )
case class Awake

val myScheduler2 = new Actor {
private var millis = 0L
private var obj: Actor = null
def act {
loop {
receive {
case x: Reschedule => {
obj = sender.receiver
millis = x.millis
println( "Received reschedule" )
}
}
println( "Scheduler sleeping for " +millis )
Thread.sleep( millis )
obj ! Awake
}
}
}

myScheduler2.start

val myRoutine2 = new Actor {
private var count = 0

def sleep( millis: Long ) {
myScheduler2 ! Reschedule( millis )
receive {
case Awake => println( "awoken" )
}
}

def act {
loopWhile( count < 20 ) {
println( "here " + count )
count = count + 1
sleep( 666 )
}
}
}

myRoutine2.start

note that of course in the proper implementation, the rescheduling
would put the sender on a priorityqueue... this is just for testing.

however, i wonder if this is a) threadsafe, b) efficient, as i
understand the receive and ! use (heavy weight) threads. so i was
trying to do the same using react and !? ... but i think i created an
infinite loop (hang) in the below example. so i had to comment out
the synchronous communication at least in the Reschedule case and go
back to receive and ! ... :

/////////////

import scala.actors.Actor
import scala.actors.Actor._

case class Reschedule( millis: Long )
case class Awake
case class Done

val myScheduler3 = new Actor {
private var millis = 0L
private var obj: Actor = null
def act {
loop {
receive {
// react { }
case x: Reschedule => {
obj = sender.receiver
millis = x.millis
// reply( Done )
println( "Received reschedule" )
}
}
println( "Scheduler sleeping for " +millis )
Thread.sleep( millis )
obj !? Awake
}
}
}

myScheduler3.start

val myActor3 = new Actor {
private var count = 0

def sleep( millis: Long ) {
// myScheduler3 !? Reschedule( millis )
myScheduler3 ! Reschedule( millis )
react {
case Awake => reply( Done ); println( "awoken" );
}
}

def act {
loopWhile( count < 20 ) {
println( "here " + count )
count = count + 1
sleep( 666 )
}
}
}

myActor3.start

sorry if that all looks awkward, any hints are appreciated. also i
have no clue about Futures, so maybe a construction using !! would be
more elegant?

also i have a problem with loop and loopWhile. image i would want

routine {
var count = 0
loopWhile( count < 20 ) {
println( "Count is " + count ); count = count + 1
sleep( 666 )
}
println( "Loop done!" )
sleep( 2000 )
println( "End of routine" )
}

it seems loopWhile kills the actor? at least the println( "Loop
done!" ) is never reached....

thanks, -sciss-

Sciss
Joined: 2008-12-17,
User offline. Last seen 28 weeks 5 days ago.
Re: Again: using actors to schedule things in time

this would also work if use the asynchronous model with receive,
since i don't need loop in this case:

def sleep( millis: Long ) {
myScheduler2 ! Reschedule( millis )
receive {
case Awake => println( "awoken" )
}
}

val myRoutine2b = actor {
for( count <- (0 until 20) ) {
println( "here " + count )
sleep( 666 )
}
println( "Loop done!" )
sleep( 2000 )
println( "Routine done!" )
}

so.... how can i achieve the same thing synchronously?

Am 11.02.2009 um 20:07 schrieb Sciss:

> also i have a problem with loop and loopWhile. image i would want
>
> routine {
> var count = 0
> loopWhile( count < 20 ) {
> println( "Count is " + count ); count = count + 1
> sleep( 666 )
> }
> println( "Loop done!" )
> sleep( 2000 )
> println( "End of routine" )
> }
>
> it seems loopWhile kills the actor? at least the println( "Loop
> done!" ) is never reached....

Colin Bullock
Joined: 2009-01-23,
User offline. Last seen 42 years 45 weeks ago.
Re: Again: using actors to schedule things in time
This can be implemented easily on top of timeouts:

import scala.actors.Actor._
import scala.actors.TIMEOUT

def sleep(millis: Long) =
  receiveWithin(millis) {
    case TIMEOUT =>
  }


- Colin
Sciss
Joined: 2008-12-17,
User offline. Last seen 28 weeks 5 days ago.
Re: Again: using actors to schedule things in time

the problem is twofold

- i want a single thread system
- i want to exchange system time at some point by logically advancing
time.

i have come up with this:

import scala.actors.Actor
import scala.actors.Actor._
import scala.collection.mutable.PriorityQueue

case class Reschedule( delta: Long, who: Actor )
case class Awake
case class Scheduled( when: Long, what: Actor ) extends Ordered
[Scheduled] {
def compare( that: Scheduled ) = when.compare( that.when )
}
case class Done

object Main {
var verbose = true
var sched : MyScheduler = _

/**
* @param args the command line arguments
*/
def main(args: Array[String]) = {
sched = new MyScheduler
sched.start
routine {
var count = 0;
println( "Entering routine loop" )
loopWhile( count < 20 ) {
println( "Here! " + count )
sleep( 666 )
count = count + 1
}
}
}

def routine( body: => Unit ) {
val actor = new Actor {
def act {
sleep( 0 )
body
}
}
actor.start
actor
}

def sleep( delta: Long ) {
sched ! Reschedule( delta, Actor.self )
react { case Awake => if( verbose ) println( "Awoken" )}
println( "Schoko" )
}
}

class MyScheduler extends Actor {
var verbose = true

private val queue = new PriorityQueue[Scheduled]
def act {
loop {
val now = System.currentTimeMillis
if( verbose ) {
println( "Entering react at " + now )
}
react {
case r: Reschedule => {
if( verbose ) {
println( "Received reschedule " + r.delta )
}
queue += Scheduled( now + Math.max( 0,
r.delta ), r.who )
while( !queue.isEmpty ) {
val x = queue.dequeue
if( verbose ) {
println( "Dequeued " + x.when )
}
if( x.when > now ) {
if( verbose ) {
println( "Sleeping for " + (x.when -
now) )
}
Thread.sleep( x.when - now )
}
if( verbose ) {
println( "Awaking actor : " + r.who )
}
x.what ! Awake
}
if( verbose ) println( "Queue done" )
}

case _ => {
println( "Wooooo" )
}
}
}
}
}

but the process somewhat stops after the initial scheduling:

Entering react at 1234479023924
Received reschedule 0
Dequeued 1234479023924
Awaking actor : scalaapplication1.Main$$anon$1@292cb2
Queue done
Entering react at 1234479023956
Awoken
(then frozen, no reaction, no "Schoko" ....)

what i'm doing wrong here...? can it be that difficult?

thanks, -sciss-

Am 11.02.2009 um 21:51 schrieb Colin Bullock:

> This can be implemented easily on top of timeouts:
>
> import scala.actors.Actor._
> import scala.actors.TIMEOUT
>
> def sleep(millis: Long) =
> receiveWithin(millis) {
> case TIMEOUT =>
> }
>
>
> - Colin

Sciss
Joined: 2008-12-17,
User offline. Last seen 28 weeks 5 days ago.
Re: Again: using actors to schedule things in time

i mean, the problem seems not to be that i could not under any
circumstance define a body that would work as expected:

import scala.actors.Actor
import scala.actors.Actor._
import scala.collection.mutable.PriorityQueue

case class Reschedule( delta: Long, who: Actor )
case class Awake
case class Scheduled( when: Long, what: Actor ) extends Ordered
[Scheduled] {
def compare( that: Scheduled ) = when.compare( that.when )
}

object Main extends Application {
var verbose = true
var sched : MyScheduler = _

/**
* @param args the command line arguments
*/
override def main(args: Array[String]) = {
sched = new MyScheduler
sched.start
actor {
var count = 0;
println( "Entering routine loop" )
sched ! Reschedule( 0, Actor.self )
loopWhile( count < 20 ) {
react { case Awake =>
println( "Here! " + count + "; " +
Thread.currentThread.hashCode )
sched ! Reschedule( 666, Actor.self )
count = count + 1
}
}
}
}
}

but this way i cannot place sleep() calls wherever i want, plus the
code becomes very ugly and verbose.

thanks for hints!

best, -sciss-

Am 12.02.2009 um 23:53 schrieb Sciss:

> the problem is twofold
>
> - i want a single thread system
> - i want to exchange system time at some point by logically
> advancing time.
>
> i have come up with this:
>
> import scala.actors.Actor
> import scala.actors.Actor._
> import scala.collection.mutable.PriorityQueue
>
> case class Reschedule( delta: Long, who: Actor )
> case class Awake
> case class Scheduled( when: Long, what: Actor ) extends Ordered
> [Scheduled] {
> def compare( that: Scheduled ) = when.compare( that.when )
> }
> case class Done
>
> object Main {
> var verbose = true
> var sched : MyScheduler = _
>
> /**
> * @param args the command line arguments
> */
> def main(args: Array[String]) = {
> sched = new MyScheduler
> sched.start
> routine {
> var count = 0;
> println( "Entering routine loop" )
> loopWhile( count < 20 ) {
> println( "Here! " + count )
> sleep( 666 )
> count = count + 1
> }
> }
> }
>
> def routine( body: => Unit ) {
> val actor = new Actor {
> def act {
> sleep( 0 )
> body
> }
> }
> actor.start
> actor
> }
>
> def sleep( delta: Long ) {
> sched ! Reschedule( delta, Actor.self )
> react { case Awake => if( verbose ) println( "Awoken" )}
> println( "Schoko" )
> }
> }
>
> class MyScheduler extends Actor {
> var verbose = true
>
> private val queue = new PriorityQueue[Scheduled]
> def act {
> loop {
> val now = System.currentTimeMillis
> if( verbose ) {
> println( "Entering react at " + now )
> }
> react {
> case r: Reschedule => {
> if( verbose ) {
> println( "Received reschedule " + r.delta )
> }
> queue += Scheduled( now + Math.max( 0,
> r.delta ), r.who )
> while( !queue.isEmpty ) {
> val x = queue.dequeue
> if( verbose ) {
> println( "Dequeued " + x.when )
> }
> if( x.when > now ) {
> if( verbose ) {
> println( "Sleeping for " + (x.when
> - now) )
> }
> Thread.sleep( x.when - now )
> }
> if( verbose ) {
> println( "Awaking actor : " + r.who )
> }
> x.what ! Awake
> }
> if( verbose ) println( "Queue done" )
> }
>
> case _ => {
> println( "Wooooo" )
> }
> }
> }
> }
> }
>
> but the process somewhat stops after the initial scheduling:
>
> Entering react at 1234479023924
> Received reschedule 0
> Dequeued 1234479023924
> Awaking actor : scalaapplication1.Main$$anon$1@292cb2
> Queue done
> Entering react at 1234479023956
> Awoken
> (then frozen, no reaction, no "Schoko" ....)
>
> what i'm doing wrong here...? can it be that difficult?
>
> thanks, -sciss-
>
>
>
> Am 11.02.2009 um 21:51 schrieb Colin Bullock:
>
>> This can be implemented easily on top of timeouts:
>>
>> import scala.actors.Actor._
>> import scala.actors.TIMEOUT
>>
>> def sleep(millis: Long) =
>> receiveWithin(millis) {
>> case TIMEOUT =>
>> }
>>
>>
>> - Colin
>

Copyright © 2012 École Polytechnique Fédérale de Lausanne (EPFL), Lausanne, Switzerland