This page is no longer maintained — Please continue to the home page at www.scala-lang.org

Akka and Future

3 replies
Mic
Joined: 2011-11-23,
User offline. Last seen 42 years 45 weeks ago.
Hello,I could not find so many examples for AKKA's Future, but I found a following Python solution ( http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html )
import multiprocessing
import time

class Consumer(multiprocessing.Process):
    
    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print '%s: Exiting' % proc_name
                self.task_queue.task_done()
                break
            print '%s: %s' % (proc_name, next_task)
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return


class Task(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b
    def __call__(self):
        time.sleep(0.1) # pretend to take some time to do the work
        return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
    def __str__(self):
        return '%s * %s' % (self.a, self.b)


if __name__ == '__main__':
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()
    
    # Start consumers
    num_consumers = multiprocessing.cpu_count() * 2
    print 'Creating %d consumers' % num_consumers
    consumers = [ Consumer(tasks, results)
                  for i in xrange(num_consumers) ]
    for w in consumers:
        w.start()
    
    # Enqueue jobs
    num_jobs = 10
    for i in xrange(num_jobs):
        tasks.put(Task(i, i))
    
    # Add a poison pill for each consumer
    for i in xrange(num_consumers):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()
    
    # Start printing results
    while num_jobs:
        result = results.get()
        print 'Result:', result
        num_jobs -= 1
and as output you will get:
$ python -u multiprocessing_producer_consumer.py

Creating 4 consumers
Consumer-1: 0 * 0
Consumer-2: 1 * 1
Consumer-3: 2 * 2
Consumer-4: 3 * 3
Consumer-3: 4 * 4
Consumer-4: 5 * 5
Consumer-2: 6 * 6
Consumer-1: 7 * 7
Consumer-3: 8 * 8
Consumer-4: 9 * 9
Consumer-2: Exiting
Consumer-1: Exiting
Consumer-3: Exiting
Consumer-4: Exiting
Result: 2 * 2 = 4
Result: 3 * 3 = 9
Result: 1 * 1 = 1
Result: 0 * 0 = 0
Result: 4 * 4 = 16
Result: 5 * 5 = 25
Result: 6 * 6 = 36
Result: 7 * 7 = 49
Result: 8 * 8 = 64
Result: 9 * 9 = 81
How is it possible to get something like that with AKKA and Future?
Thank you in advance.






Viktor Klang
Joined: 2008-12-17,
User offline. Last seen 1 year 27 weeks ago.
Re: Akka and Future
Hi Mic,

Was there something dissatisfying with the Akka documentation?

http://akka.io/docs/akka/1.3-RC1/

Cheers,


On Wed, Nov 23, 2011 at 12:53 AM, Mic <mictadlo@gmail.com> wrote:
Hello,I could not find so many examples for AKKA's Future, but I found a following Python solution ( http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html )
import multiprocessing
import time

class Consumer(multiprocessing.Process):
    
    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print '%s: Exiting' % proc_name
                self.task_queue.task_done()
                break
            print '%s: %s' % (proc_name, next_task)
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return


class Task(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b
    def __call__(self):
        time.sleep(0.1) # pretend to take some time to do the work
        return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
    def __str__(self):
        return '%s * %s' % (self.a, self.b)


if __name__ == '__main__':
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()
    
    # Start consumers
    num_consumers = multiprocessing.cpu_count() * 2
    print 'Creating %d consumers' % num_consumers
    consumers = [ Consumer(tasks, results)
                  for i in xrange(num_consumers) ]
    for w in consumers:
        w.start()
    
    # Enqueue jobs
    num_jobs = 10
    for i in xrange(num_jobs):
        tasks.put(Task(i, i))
    
    # Add a poison pill for each consumer
    for i in xrange(num_consumers):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()
    
    # Start printing results
    while num_jobs:
        result = results.get()
        print 'Result:', result
        num_jobs -= 1
and as output you will get:
$ python -u multiprocessing_producer_consumer.py

Creating 4 consumers
Consumer-1: 0 * 0
Consumer-2: 1 * 1
Consumer-3: 2 * 2
Consumer-4: 3 * 3
Consumer-3: 4 * 4
Consumer-4: 5 * 5
Consumer-2: 6 * 6
Consumer-1: 7 * 7
Consumer-3: 8 * 8
Consumer-4: 9 * 9
Consumer-2: Exiting
Consumer-1: Exiting
Consumer-3: Exiting
Consumer-4: Exiting
Result: 2 * 2 = 4
Result: 3 * 3 = 9
Result: 1 * 1 = 1
Result: 0 * 0 = 0
Result: 4 * 4 = 16
Result: 5 * 5 = 25
Result: 6 * 6 = 36
Result: 7 * 7 = 49
Result: 8 * 8 = 64
Result: 9 * 9 = 81
How is it possible to get something like that with AKKA and Future?
Thank you in advance.









--
Viktor Klang

Akka Tech LeadTypesafe - Enterprise-Grade Scala from the Experts

Twitter: @viktorklang
Derek Williams 3
Joined: 2011-08-12,
User offline. Last seen 42 years 45 weeks ago.
Re: Akka and Future
On Tue, Nov 22, 2011 at 4:53 PM, Mic <mictadlo@gmail.com> wrote:
How is it possible to get something like that with AKKA and Future?

untested, but something like this would work if you want a solution with just Future.
// send tasks to the dispatherval results = Future.traverse(List.range(0,10))(i => Future("%d x %d = %d" format (i, i, i*i)))
// wait for completion and print results results.get foreach println
--
Derek Williams
Mic
Joined: 2011-11-23,
User offline. Last seen 42 years 45 weeks ago.
Re: Akka and Future
Thank you

On Thu, Nov 24, 2011 at 2:44 AM, Derek Williams <derek@fyrie.net> wrote:
On Tue, Nov 22, 2011 at 4:53 PM, Mic <mictadlo@gmail.com> wrote:
How is it possible to get something like that with AKKA and Future?

untested, but something like this would work if you want a solution with just Future.
// send tasks to the dispatherval results = Future.traverse(List.range(0,10))(i => Future("%d x %d = %d" format (i, i, i*i)))
// wait for completion and print results results.get foreach println
--
Derek Williams

Copyright © 2012 École Polytechnique Fédérale de Lausanne (EPFL), Lausanne, Switzerland