- About Scala
- Documentation
- Code Examples
- Software
- Scala Developers
actors (again)
Wed, 2009-02-25, 12:53
Folks,
I'd appreciate id the concurrency gurus among us
could comment on this implementation of actors.
It looks almost to simple to be correct.
It works for simple things like a ping-pong game.
I'd be interested in more complex use cases to test the
implementation with.
No time-out functionality is implemented, only (send/receive).
ps: I'm using List to implement queue-like behavior
[ not very efficient, I agree ].
ps: I use a class SyncVar which similar to (but different from)
the one in ScalaByExample [ no while loop needed because
(I think) I'm sure that the right thread is notified ]
== library ==
package util
class SyncVar[X] {
private[this] var _x : X = _
def set(x: X) = synchronized {
_x = x
notify
}
def get = synchronized {
wait
_x
}
}
trait Partial[X] {
val exists: X => Boolean
}
class PartialSyncVar[X](p: X => Boolean) extends SyncVar[X] with Partial[X] {
val exists = p
}
abstract class Actor[X,Y] extends Runnable {
private var xs : List[X] = Nil
private var psvs : List[PartialSyncVar[X]] = Nil
def act() = run()
def !(x: X) = {
val lpsvs = psvs.filter(_.exists(x))
if(!lpsvs.isEmpty) {
val psv = lpsvs.last
psvs = psvs.filter(_ != psv)
psv set x
} else {
xs = x :: xs
}
}
def receive(pf: PartialFunction[X,Y]) = {
val exists = pf.isDefinedAt(_)
val x = {
val lxs = xs.filter(exists)
if(!lxs.isEmpty) {
val lx = lxs.last
xs = xs.filter(_ != lx)
lx
} else {
val psv = new PartialSyncVar[X](exists)
psvs = psv :: psvs
psv get
}
}
pf(x)
}
}
== application ==
import util.Actor
sealed abstract class PingPong
case class Ping(actor: Actor[PingPong,Unit]) extends PingPong
case class Pong(actor: Actor[PingPong,Unit]) extends PingPong
case class Over(actor: Actor[PingPong,Unit]) extends PingPong
class PingPongActor(name: String) extends Actor[PingPong,Unit] {
var isPlaying = true
def pong(actor: Actor[PingPong,Unit]) {
println(Pong(this))
actor ! Pong(this)
}
def ping(actor: Actor[PingPong,Unit]) {
println(Ping(this))
actor ! Ping(this)
}
private def over(actor: Actor[PingPong,Unit]) {
isPlaying = false
print (Over(this))
actor ! Over(this)
}
private def done() {
isPlaying = false
println(" and Done("+this+")")
}
override def toString = name
override def run() {
while(isPlaying) {
receive {
case Ping(actor) => if (Math.random < 0.9) {
pong(actor)
} else {
over(actor)
}
case Pong(actor) => if (Math.random < 0.9) {
ping(actor)
} else {
over(actor)
}
case Over(actor) => {
done()
}
}
}
}
}
object Main01 {
def main(args: Array[String]) {
val jan = new PingPongActor("jan")
val jos = new PingPongActor("jos")
new Thread(jan).start()
new Thread(jos).start()
if(Math.random < 0.5) {
jos ping jan
} else {
jan ping jos
}
}
}
--
__~O
-\ <,
(*)/ (*)
reality goes far beyond imagination
I'd appreciate id the concurrency gurus among us
could comment on this implementation of actors.
It looks almost to simple to be correct.
It works for simple things like a ping-pong game.
I'd be interested in more complex use cases to test the
implementation with.
No time-out functionality is implemented, only (send/receive).
ps: I'm using List to implement queue-like behavior
[ not very efficient, I agree ].
ps: I use a class SyncVar which similar to (but different from)
the one in ScalaByExample [ no while loop needed because
(I think) I'm sure that the right thread is notified ]
== library ==
package util
class SyncVar[X] {
private[this] var _x : X = _
def set(x: X) = synchronized {
_x = x
notify
}
def get = synchronized {
wait
_x
}
}
trait Partial[X] {
val exists: X => Boolean
}
class PartialSyncVar[X](p: X => Boolean) extends SyncVar[X] with Partial[X] {
val exists = p
}
abstract class Actor[X,Y] extends Runnable {
private var xs : List[X] = Nil
private var psvs : List[PartialSyncVar[X]] = Nil
def act() = run()
def !(x: X) = {
val lpsvs = psvs.filter(_.exists(x))
if(!lpsvs.isEmpty) {
val psv = lpsvs.last
psvs = psvs.filter(_ != psv)
psv set x
} else {
xs = x :: xs
}
}
def receive(pf: PartialFunction[X,Y]) = {
val exists = pf.isDefinedAt(_)
val x = {
val lxs = xs.filter(exists)
if(!lxs.isEmpty) {
val lx = lxs.last
xs = xs.filter(_ != lx)
lx
} else {
val psv = new PartialSyncVar[X](exists)
psvs = psv :: psvs
psv get
}
}
pf(x)
}
}
== application ==
import util.Actor
sealed abstract class PingPong
case class Ping(actor: Actor[PingPong,Unit]) extends PingPong
case class Pong(actor: Actor[PingPong,Unit]) extends PingPong
case class Over(actor: Actor[PingPong,Unit]) extends PingPong
class PingPongActor(name: String) extends Actor[PingPong,Unit] {
var isPlaying = true
def pong(actor: Actor[PingPong,Unit]) {
println(Pong(this))
actor ! Pong(this)
}
def ping(actor: Actor[PingPong,Unit]) {
println(Ping(this))
actor ! Ping(this)
}
private def over(actor: Actor[PingPong,Unit]) {
isPlaying = false
print (Over(this))
actor ! Over(this)
}
private def done() {
isPlaying = false
println(" and Done("+this+")")
}
override def toString = name
override def run() {
while(isPlaying) {
receive {
case Ping(actor) => if (Math.random < 0.9) {
pong(actor)
} else {
over(actor)
}
case Pong(actor) => if (Math.random < 0.9) {
ping(actor)
} else {
over(actor)
}
case Over(actor) => {
done()
}
}
}
}
}
object Main01 {
def main(args: Array[String]) {
val jan = new PingPongActor("jan")
val jos = new PingPongActor("jos")
new Thread(jan).start()
new Thread(jos).start()
if(Math.random < 0.5) {
jos ping jan
} else {
jan ping jos
}
}
}
--
__~O
-\ <,
(*)/ (*)
reality goes far beyond imagination