- About Scala
- Documentation
- Code Examples
- Software
- Scala Developers
Actors, Threading, Jobs
Fri, 2009-03-13, 03:25
I have N jobs to do, and I'd like to distribute them over M processors
(and N>M). It's my first time using the Actors paradigm for
concurrency, and I'm not sure whether this solution is as simple as
possible, or whether it is correct for all scenarios. Please let me
know if you have a better way of doing this, if there is already high
level support for this in the scala API, if there is documentation
anywhere on the web for this kind of problem, or if you have recommended
fixes for my implementation. Thanks for your help.
Sam Reid
package testactors
import _root_.scala.actors.{Actor}
import _root_.scala.collection.mutable.ArrayBuffer
import java.io.File
import scala.actors.Actor._
object TestActors {
def main(args: Array[String]) {
case class Job(folder: File, index: Int)
val jobs = Job(new File("."), 3) :: Job(new File("."), 4) :: Job(new
File("."), 5) :: Job(new File("."), 333) :: Job(new File("."), 234) :: Nil
val scheduled = new ArrayBuffer[Job]
val done = new ArrayBuffer[Job]
def nextProblem() = {
val j = jobs.toList.remove(p => scheduled.contains(p))
if (j.length > 0) j(0) else null
}
val myScheduler = actor{
while (done.length < jobs.length) {
def next(worker: Actor) = {
if (done.length < jobs.length) {
val newJob = nextProblem()
if (newJob != null) {
scheduled += newJob
worker ! newJob
} else {
worker ! "done"
}
}
else {
worker ! "done"
}
}
receive{
case (worker: Worker) => next(worker)
case (worker: Worker, job: Job) => {
done += job
next(worker)
}
}
}
println("Scheduler finished, all " + done.length + " Jobs are done")
exit()
}
class Worker(_index: Int) extends Actor {
val index = _index
def act() = {
while (true) {
receive{
case "done" => exit()
case job: Any => {
println("actor " + index + " starting job: " + job)
Thread.sleep(0)
myScheduler ! (this, job)
}
}
}
}
start()
}
val actorList = for (i <- 1 to 3) yield new Worker(i)
for (actor <- actorList) myScheduler ! actor //tell scheduler we are
ready to start
}
}
Fri, 2009-03-13, 08:47
#2
Re: Actors, Threading, Jobs
On Fri, Mar 13, 2009 at 11:18 AM, Samuel Robert Reid wrote:
> Here's an incremental improvement on the snippet in my last email that
> distributed N independent jobs to M actors. This version uses generics and
> an abstraction for the algorithm (assumes same algorithm is run for all
> tasks). I've put the new code below, but I'm still betting you have a
> solution that's much more elegant than this one.
Well, here is my solution. Logic is almost the same and there is a
mutable state similar to yours in scheduler:
package new_job
import scala.actors.{Actor}
import java.io.File
import scala.actors.Actor._
case class Job(folder: File, index: Int)
class MyScheduler(var jobs: List[Job]) extends Actor {
var done = List[Job]()
var scheduled = List[Job]()
def act() = loop {
receive {
case worker: Worker => next(worker)
case (worker: Worker,job: Job) => {
println("Job is done : " + job)
done = job :: done
scheduled = scheduled - job
next(worker)
}
case _ => //
}
}
def next(worker: Actor) = if(jobs.isEmpty) {
worker ! "done"
if(scheduled.isEmpty) {
println("All work is done " + done.length)
exit()
}
} else {
val t_job = jobs.head
jobs = jobs.tail
scheduled = t_job :: scheduled
worker ! t_job
}
}
class Worker(val index: Int) extends Actor {
def act() = loop {
receive {
case "done" => exit()
case job: Job => {
println("actor " + index + " starting job: " + job)
Thread.sleep(0)
reply((this, job))
}
}
}
start()
}
object App {
def main(args: Array[String]) = {
val jobs = Job(new File("."), 3) :: Job(new File("."), 4) ::
Job(new File("."), 5) :: Job(new File("."), 333) :: Job(new File("."),
234) :: Nil
val scheduler = new MyScheduler(jobs)
scheduler.start()
for(i <- 1 to 3) scheduler ! new Worker(i)
}
}
(or http://gist.github.com/78471 if formatting is screwed)
There is no need of while(true) in act() method. Also if your work is
not IO bound you can use react() in stead of receive() for Event based
Actors.
I can possibly get rid of mutable state in above snippet using
recursion but somehow solution seems long winded that way.
>
> Thanks,
> Sam Reid
>
> import _root_.scala.actors.Actor
> import java.io.File
> import _root_.scala.collection.mutable.ArrayBuffer
>
> trait Job
> class Scheduler[T <: Job](_jobs: List[T], numWorkers: Int, algorithm: (T) =>
> Unit) extends Actor {
> val jobs = _jobs
> val todo = new ArrayBuffer[T]
> todo ++= jobs
> val done = new ArrayBuffer[T]
>
> def nextProblem(): Option[T] = if (todo.length > 0) Some[T](todo(0)) else
> None
>
> def act() = {
> while (done.length < jobs.length) {
> def next(worker: Actor) = {
> if (done.length < jobs.length) {
> val newJob = nextProblem()
> if (newJob.isDefined) {
> todo -= newJob.get
> worker ! newJob.get
> } else {
> worker ! "done"
> }
> }
> else {
> worker ! "done"
> }
> }
> receive{
> case (worker: Worker[T]) => next(worker)
> case (worker: Worker[T], job: T) => {
> done += job
> next(worker)
> }
> }
> }
> println("Scheduler finished, all " + done.length + " Jobs are done")
> exit()
> }
>
> val workers = for (i <- 1 to numWorkers) yield new Worker(i, this,
> algorithm)
>
> override def start() = {
> for (actor <- workers) this ! actor //tell scheduler we are ready to start
> super.start()
> }
> }
> class Worker[T <: Job](_index: Int, _sch: Scheduler[T], algorithm: (T) =>
> Unit) extends Actor {
> val index = _index
> val sch = _sch
>
> def act() = {
> while (true) {
> receive{
> case "done" => exit()
> case job: T => {
> algorithm(job)
> sch ! (this, job)
> }
> }
> }
> }
> start()
> }
>
> Samuel Robert Reid wrote:
>>
>> I have N jobs to do, and I'd like to distribute them over M processors
>> (and N>M). It's my first time using the Actors paradigm for concurrency,
>> and I'm not sure whether this solution is as simple as possible, or whether
>> it is correct for all scenarios. Please let me know if you have a better
>> way of doing this, if there is already high level support for this in the
>> scala API, if there is documentation anywhere on the web for this kind of
>> problem, or if you have recommended fixes for my implementation. Thanks for
>> your help.
>>
>> Sam Reid
>>
>> [...]
>
Here's an incremental improvement on the snippet in my last email that
distributed N independent jobs to M actors. This version uses generics
and an abstraction for the algorithm (assumes same algorithm is run for
all tasks). I've put the new code below, but I'm still betting you have
a solution that's much more elegant than this one.
Thanks,
Sam Reid
import _root_.scala.actors.Actor
import java.io.File
import _root_.scala.collection.mutable.ArrayBuffer
trait Job
class Scheduler[T <: Job](_jobs: List[T], numWorkers: Int, algorithm:
(T) => Unit) extends Actor {
val jobs = _jobs
val todo = new ArrayBuffer[T]
todo ++= jobs
val done = new ArrayBuffer[T]
def nextProblem(): Option[T] = if (todo.length > 0) Some[T](todo(0))
else None
def act() = {
while (done.length < jobs.length) {
def next(worker: Actor) = {
if (done.length < jobs.length) {
val newJob = nextProblem()
if (newJob.isDefined) {
todo -= newJob.get
worker ! newJob.get
} else {
worker ! "done"
}
}
else {
worker ! "done"
}
}
receive{
case (worker: Worker[T]) => next(worker)
case (worker: Worker[T], job: T) => {
done += job
next(worker)
}
}
}
println("Scheduler finished, all " + done.length + " Jobs are done")
exit()
}
val workers = for (i <- 1 to numWorkers) yield new Worker(i, this,
algorithm)
override def start() = {
for (actor <- workers) this ! actor //tell scheduler we are ready to
start
super.start()
}
}
class Worker[T <: Job](_index: Int, _sch: Scheduler[T], algorithm: (T)
=> Unit) extends Actor {
val index = _index
val sch = _sch
def act() = {
while (true) {
receive{
case "done" => exit()
case job: T => {
algorithm(job)
sch ! (this, job)
}
}
}
}
start()
}
Samuel Robert Reid wrote:
> I have N jobs to do, and I'd like to distribute them over M processors
> (and N>M). It's my first time using the Actors paradigm for
> concurrency, and I'm not sure whether this solution is as simple as
> possible, or whether it is correct for all scenarios. Please let me
> know if you have a better way of doing this, if there is already high
> level support for this in the scala API, if there is documentation
> anywhere on the web for this kind of problem, or if you have
> recommended fixes for my implementation. Thanks for your help.
>
> Sam Reid
>
> [...]