- About Scala
- Documentation
- Code Examples
- Software
- Scala Developers
[reactive-core] limit rate of firing but remember last value
Sat, 2012-01-14, 13:49
Hi list,
I'm trying to write function that takes a EventStream[Double] and after it has fired one value does not let it output again before t seconds has passed, but also keeps the last value that arrived in the mean time, and after t seconds outputs that value. I tryed to think of ways of doing but didn't manage to do it yet.
So far I managed to write a function that limits the rate of firing but does remember values that arrived in the mean time:
def slowdown(a:EventStream[Double], t:Double = 1.0) = a .foldLeft( ((0L,0.0), true) ){ case ( ((tOutputed,xOutputed),_), x ) => val newT = System.currentTimeMillis() val newV = (newT,x) val shouldFire = ( newT - tOutputed ) >= t*1000 (if(shouldFire) newV else (tOutputed,xOutputed), shouldFire) } .collect{ case ( (t,x), true) => x }
and I've managed to write a function that remember the values that arrived after the a value has been outputted and will fire it after t seconds, but it does not start counting again t seconds after the stored value was fired, so it might happen that a new value arrives and is fired immediately. I think instead of the Delay it should have a Timer and this timer should keep firing at t seconds intervals with the incoming values, but stop if no value has been inputed in the last t seconds. It seems to me that the state that tracks when was the last time an event was fired is needed both at the beggining of the network and towards the end (to stop the timer), so maybe some kind of recursion is needed here, but I can't see how to do it.
def delay(a:EventStream[Double], t:Double = 1.0) = { val x = a.foldLeft( ((0L,0.0), (0L,0.0)) ){ case ( ( (tOutputed,xOutputed), (t1, x1)), x2 ) => val t2 = System.currentTimeMillis() //println( t2 - tOutputed ) ( if( ( t2 - tOutputed ) >= t*1000 ) (t2,x2) else (tOutputed,xOutputed) , (t2,x2) ) } //value happened after delay time t, doesn't need to be delayed val noDelay = x.collect{ case (v1, v2) if v1 == v2 => v2._2 } //timer starts counting from last outputted value val timer = noDelay.flatMap{ x => new Delay((),(t*1000).toLong) }
//value happend after delay time t, select it to be delayed. val valueToDelay = x.collect{ case (v1, v2) if v1 != v2 => v2._2 }
//keep "storing" the values with flatMap, output the last one when the timer fires val delayedValue = valueToDelay.flatMap{ x => timer.map(w=>x) }
noDelay | delayedValue }
where, Delay is just a stream that waits t seconds and then fires:
import reactive._
import java.util.{Timer => juTimer, TimerTask}
private object _delay extends juTimer { def schedule(delay: Long)(p: =>Unit): TimerTask = { val tt = new java.util.TimerTask {def run = p } super.schedule(tt, delay) tt }}
/** * An EventStream that fires one event with a given delay. */class Delay[A]( val value: A, delay: Long = 1000) extends EventSource[A] {
private val tt: TimerTask = _delay.schedule(delay) { fire(value) }
}
Can anyone help me out on how write this function ?
best regards,Miguel Negrão
I'm trying to write function that takes a EventStream[Double] and after it has fired one value does not let it output again before t seconds has passed, but also keeps the last value that arrived in the mean time, and after t seconds outputs that value. I tryed to think of ways of doing but didn't manage to do it yet.
So far I managed to write a function that limits the rate of firing but does remember values that arrived in the mean time:
def slowdown(a:EventStream[Double], t:Double = 1.0) = a .foldLeft( ((0L,0.0), true) ){ case ( ((tOutputed,xOutputed),_), x ) => val newT = System.currentTimeMillis() val newV = (newT,x) val shouldFire = ( newT - tOutputed ) >= t*1000 (if(shouldFire) newV else (tOutputed,xOutputed), shouldFire) } .collect{ case ( (t,x), true) => x }
and I've managed to write a function that remember the values that arrived after the a value has been outputted and will fire it after t seconds, but it does not start counting again t seconds after the stored value was fired, so it might happen that a new value arrives and is fired immediately. I think instead of the Delay it should have a Timer and this timer should keep firing at t seconds intervals with the incoming values, but stop if no value has been inputed in the last t seconds. It seems to me that the state that tracks when was the last time an event was fired is needed both at the beggining of the network and towards the end (to stop the timer), so maybe some kind of recursion is needed here, but I can't see how to do it.
def delay(a:EventStream[Double], t:Double = 1.0) = { val x = a.foldLeft( ((0L,0.0), (0L,0.0)) ){ case ( ( (tOutputed,xOutputed), (t1, x1)), x2 ) => val t2 = System.currentTimeMillis() //println( t2 - tOutputed ) ( if( ( t2 - tOutputed ) >= t*1000 ) (t2,x2) else (tOutputed,xOutputed) , (t2,x2) ) } //value happened after delay time t, doesn't need to be delayed val noDelay = x.collect{ case (v1, v2) if v1 == v2 => v2._2 } //timer starts counting from last outputted value val timer = noDelay.flatMap{ x => new Delay((),(t*1000).toLong) }
//value happend after delay time t, select it to be delayed. val valueToDelay = x.collect{ case (v1, v2) if v1 != v2 => v2._2 }
//keep "storing" the values with flatMap, output the last one when the timer fires val delayedValue = valueToDelay.flatMap{ x => timer.map(w=>x) }
noDelay | delayedValue }
where, Delay is just a stream that waits t seconds and then fires:
import reactive._
import java.util.{Timer => juTimer, TimerTask}
private object _delay extends juTimer { def schedule(delay: Long)(p: =>Unit): TimerTask = { val tt = new java.util.TimerTask {def run = p } super.schedule(tt, delay) tt }}
/** * An EventStream that fires one event with a given delay. */class Delay[A]( val value: A, delay: Long = 1000) extends EventSource[A] {
private val tt: TimerTask = _delay.schedule(delay) { fire(value) }
}
Can anyone help me out on how write this function ?
best regards,Miguel Negrão
Sun, 2012-01-15, 13:31
#2
Re: [reactive-core] limit rate of firing but remember last valu
Hi nafg,
I don't see how the list would help... I don't really need to keep all past values, just need to know the last outputted value and at what time it was outputted and the last stored value, I think.
Let me reformulate again what I want. It's a EventStream that knowing a value 'delta' (in seconds) acts like this:1 - if nothing has been fired for the last delta seconds, when a new event arrives at t0, fires it immediately2 - If after that event a new event arrives at t1 where t1-t0 < delta it stores that event and fires it at t0 + delta.3 - If multiple events arrive between t0 and t0+delta, only the last of them is fired at t0 +delta4 - If at least one event arrived between t0 and t0+delta and was fired at t0 + delta start again the process at 2, that is, fire the last incoming value that arrived after between t0 +delta and t0 + 2*delta at t0 + 2*delta. If no events arrive between t0 + delta to t0 +2*delta, then go back to 1.
It's like polling at every delta seconds, but unlike polling, you don't fire anything if nothing has arrived for some time in the last delta seconds block of time.
This would be quite easy to do in imperative style. In pseudo-code would look something like this:
val deltavar lastValuevar lastTimevar lastOutputtedTime
{ |x| =>val lastTime = System.currentTimeMillislastValue = x
if( (lastTime - lastOutputtedTime) > delta) { lastOutputtedTime = lastTime startLoop { sendOutput(lastValue) wait(delta) if( lastTime == lastOutputtedTime) stopThisLoop else { lastOutputtedTime = System.currentTimeMillis } }}}
best,Miguel Negrão
best,Miguel
On Sunday, January 15, 2012 9:14:17 AM UTC, nafg wrote:
I don't see how the list would help... I don't really need to keep all past values, just need to know the last outputted value and at what time it was outputted and the last stored value, I think.
Let me reformulate again what I want. It's a EventStream that knowing a value 'delta' (in seconds) acts like this:1 - if nothing has been fired for the last delta seconds, when a new event arrives at t0, fires it immediately2 - If after that event a new event arrives at t1 where t1-t0 < delta it stores that event and fires it at t0 + delta.3 - If multiple events arrive between t0 and t0+delta, only the last of them is fired at t0 +delta4 - If at least one event arrived between t0 and t0+delta and was fired at t0 + delta start again the process at 2, that is, fire the last incoming value that arrived after between t0 +delta and t0 + 2*delta at t0 + 2*delta. If no events arrive between t0 + delta to t0 +2*delta, then go back to 1.
It's like polling at every delta seconds, but unlike polling, you don't fire anything if nothing has arrived for some time in the last delta seconds block of time.
This would be quite easy to do in imperative style. In pseudo-code would look something like this:
val deltavar lastValuevar lastTimevar lastOutputtedTime
{ |x| =>val lastTime = System.currentTimeMillislastValue = x
if( (lastTime - lastOutputtedTime) > delta) { lastOutputtedTime = lastTime startLoop { sendOutput(lastValue) wait(delta) if( lastTime == lastOutputtedTime) stopThisLoop else { lastOutputtedTime = System.currentTimeMillis } }}}
best,Miguel Negrão
best,Miguel
On Sunday, January 15, 2012 9:14:17 AM UTC, nafg wrote:
Perhaps instead of foldLefting into a tuple, queue the values into a list. Something like:
def pulses[A](es: EventStream[A], t: Double = 1000.0): EventStream[List[A]] = es.map(_ -> System.currentTimeMillis).foldLeft((List.empty[(A,Long)],List.empty[(A,Long)])){ case ((old, cur), a) => ...
(sorry I don't have time to work on it more now --- if that's not enough let me know.)
On Sat, Jan 14, 2012 at 7:49 AM, Miguel Negrão <miguel.ne...@friendlyvirus.org> wrote:
Hi list,
I'm trying to write function that takes a EventStream[Double] and after it has fired one value does not let it output again before t seconds has passed, but also keeps the last value that arrived in the mean time, and after t seconds outputs that value. I tryed to think of ways of doing but didn't manage to do it yet.
So far I managed to write a function that limits the rate of firing but does remember values that arrived in the mean time:
def slowdown(a:EventStream[Double], t:Double = 1.0) = a .foldLeft( ((0L,0.0), true) ){ case ( ((tOutputed,xOutputed),_), x ) => val newT = System.currentTimeMillis() val newV = (newT,x) val shouldFire = ( newT - tOutputed ) >= t*1000 (if(shouldFire) newV else (tOutputed,xOutputed), shouldFire) } .collect{ case ( (t,x), true) => x }
and I've managed to write a function that remember the values that arrived after the a value has been outputted and will fire it after t seconds, but it does not start counting again t seconds after the stored value was fired, so it might happen that a new value arrives and is fired immediately. I think instead of the Delay it should have a Timer and this timer should keep firing at t seconds intervals with the incoming values, but stop if no value has been inputed in the last t seconds. It seems to me that the state that tracks when was the last time an event was fired is needed both at the beggining of the network and towards the end (to stop the timer), so maybe some kind of recursion is needed here, but I can't see how to do it.
def delay(a:EventStream[Double], t:Double = 1.0) = { val x = a.foldLeft( ((0L,0.0), (0L,0.0)) ){ case ( ( (tOutputed,xOutputed), (t1, x1)), x2 ) => val t2 = System.currentTimeMillis() //println( t2 - tOutputed ) ( if( ( t2 - tOutputed ) >= t*1000 ) (t2,x2) else (tOutputed,xOutputed) , (t2,x2) ) } //value happened after delay time t, doesn't need to be delayed val noDelay = x.collect{ case (v1, v2) if v1 == v2 => v2._2 } //timer starts counting from last outputted value val timer = noDelay.flatMap{ x => new Delay((),(t*1000).toLong) }
//value happend after delay time t, select it to be delayed. val valueToDelay = x.collect{ case (v1, v2) if v1 != v2 => v2._2 }
//keep "storing" the values with flatMap, output the last one when the timer fires val delayedValue = valueToDelay.flatMap{ x => timer.map(w=>x) }
noDelay | delayedValue }
where, Delay is just a stream that waits t seconds and then fires:
import reactive._
import java.util.{Timer => juTimer, TimerTask}
private object _delay extends juTimer { def schedule(delay: Long)(p: =>Unit): TimerTask = { val tt = new java.util.TimerTask {def run = p } super.schedule(tt, delay) tt }}
/** * An EventStream that fires one event with a given delay. */class Delay[A]( val value: A, delay: Long = 1000) extends EventSource[A] {
private val tt: TimerTask = _delay.schedule(delay) { fire(value) }
}
Can anyone help me out on how write this function ?
best regards,Miguel Negrão
Wed, 2012-01-25, 14:41
#3
Re: [reactive-core] limit rate of firing but remember last valu
Hi
Just to make sure I don't loose more time on this needlessly, could it be that what I'm trying to do is just not possible with reactive-core ?
Regards,Miguel
Just to make sure I don't loose more time on this needlessly, could it be that what I'm trying to do is just not possible with reactive-core ?
Regards,Miguel
Thu, 2012-01-26, 06:51
#4
Re: [reactive-core] limit rate of firing but remember last valu
Hi, sorry I haven't responded yet. I can't imagine that it's not possible (and if I understand correctly, it's fundamental enough that if somehow it's not possible, then whatever changes need to be made to reactive to allow it should be made).
Part of the problem is that lately I've been online infrequently for short periods of time (during which I had other things that needed to get done), and I neglected to save your question to look at it offline. I hope to put the time into it later tonight, but I'm sending this in the meantime in case you check your email before then or I end up not being able to tonight.
To clarify (I haven't yet read your question properly), is the idea that, e.g., if you have a text box firing change events that trigger a web service lookup, you might want to discard values that occur within a short amount of time before the next value so as not to perform lookups that will immediately be discarded? Rx.NET calls that Throttle, and IIRC the SWT forms library has such a thing also.
Let's put the question this way: imagine the algorithm was not being applied to values occurring in time but you were calculating it retroactively on a collection. So you have a List[(Long, Double)] (sorted ascending by the Long value), and you need to drop elements that are superseded within a close enough Long value --- are you able to get that working?
Again, I plan to read your question properly and try to come up with a real answer later tonight, with G-d's help.
On Wed, Jan 25, 2012 at 8:38 AM, Miguel Negrão <miguel.negrao-lists@friendlyvirus.org> wrote:
To clarify (I haven't yet read your question properly), is the idea that, e.g., if you have a text box firing change events that trigger a web service lookup, you might want to discard values that occur within a short amount of time before the next value so as not to perform lookups that will immediately be discarded? Rx.NET calls that Throttle, and IIRC the SWT forms library has such a thing also.
Let's put the question this way: imagine the algorithm was not being applied to values occurring in time but you were calculating it retroactively on a collection. So you have a List[(Long, Double)] (sorted ascending by the Long value), and you need to drop elements that are superseded within a close enough Long value --- are you able to get that working?
Again, I plan to read your question properly and try to come up with a real answer later tonight, with G-d's help.
On Wed, Jan 25, 2012 at 8:38 AM, Miguel Negrão <miguel.negrao-lists@friendlyvirus.org> wrote:
Hi
Just to make sure I don't loose more time on this needlessly, could it be that what I'm trying to do is just not possible with reactive-core ?
Regards,Miguel
Fri, 2012-01-27, 01:01
#5
Re: [reactive-core] limit rate of firing but remember last valu
Hi nafg,
On Thursday, January 26, 2012 5:41:50 AM UTC, nafg wrote:
Hi, sorry I haven't responded yet. I can't imagine that it's not possible (and if I understand correctly, it's fundamental enough that if somehow it's not possible, then whatever changes need to be made to reactive to allow it should be made).
Part of the problem is that lately I've been online infrequently for short periods of time (during which I had other things that needed to get done), and I neglected to save your question to look at it offline. I hope to put the time into it later tonight, but I'm sending this in the meantime in case you check your email before then or I end up not being able to tonight.
No problem, any help, at whatever time interval is always welcome . :-)
To clarify (I haven't yet read your question properly), is the idea that, e.g., if you have a text box firing change events that trigger a web service lookup, you might want to discard values that occur within a short amount of time before the next value so as not to perform lookups that will immediately be discarded? Rx.NET calls that Throttle, and IIRC the SWT forms library has such a thing also.
Well, to just discard the value is easy, it's what I implemented in the slowdown method in a previous post. What I want is variation on that where a value is not discarded but it is kept and if no other new value comes it is outputted after the waiting period is over. I attach an image with a diagram that should make clear what I want to get.
Let's put the question this way: imagine the algorithm was not being applied to values occurring in time but you were calculating it retroactively on a collection. So you have a List[(Long, Double)] (sorted ascending by the Long value), and you need to drop elements that are superseded within a close enough Long value --- are you able to get that working?
Yes, I can. This does what I want :
val waitTime = 5val list = List( (0,1), (2,3), (3,2), (4,7), (12,3), (13,4) )val x = list.sliding(2).toList.foldLeft(List.empty[(Int, Int)]){ case (v, a) if v.size == 0 => List( a(0) ) case (v, (t0,x0)::(t1,x1)::tail) => if(((t0 - v.last._1) > waitTime) ) { v :+ (t0,x0) } else if( ((t1 - v.last._1) > waitTime) ) { v :+ (v.last._1 + waitTime,x0) } else v case (v,_) => v}//special check for last itemval y = if( (list.last._1 - x.last._1) > waitTime) { x :+ list.last} else { x :+ (x.last._1 + waitTime, list.last._2)}println(y)
result:
List((0,1), (5,7), (12,3), (17,4))
best,Miguel
Tue, 2012-01-31, 10:01
#6
Re: [reactive-core] limit rate of firing but remember last valu
Sorry I didn't respond sooner. I didn't end up getting to it until the next day. I added a throttle method to EventStream which is being pushed to github right now. Let me know if it does what you want.
To be clear, what it does is that every time an event is received, it records it in a var and resets a timer (I used your code) that fires the value. If a value is received before the timer fires of course the timer is reset, and the new value is recorded fire the timer to fire. So events will always be received at the event time plus the delay time.
On Thu, Jan 26, 2012 at 6:53 PM, Miguel Negrão <miguel.negrao-lists@friendlyvirus.org> wrote:
To be clear, what it does is that every time an event is received, it records it in a var and resets a timer (I used your code) that fires the value. If a value is received before the timer fires of course the timer is reset, and the new value is recorded fire the timer to fire. So events will always be received at the event time plus the delay time.
On Thu, Jan 26, 2012 at 6:53 PM, Miguel Negrão <miguel.negrao-lists@friendlyvirus.org> wrote:
Hi nafg,
On Thursday, January 26, 2012 5:41:50 AM UTC, nafg wrote:Hi, sorry I haven't responded yet. I can't imagine that it's not possible (and if I understand correctly, it's fundamental enough that if somehow it's not possible, then whatever changes need to be made to reactive to allow it should be made).Part of the problem is that lately I've been online infrequently for short periods of time (during which I had other things that needed to get done), and I neglected to save your question to look at it offline. I hope to put the time into it later tonight, but I'm sending this in the meantime in case you check your email before then or I end up not being able to tonight.
No problem, any help, at whatever time interval is always welcome . :-)
To clarify (I haven't yet read your question properly), is the idea that, e.g., if you have a text box firing change events that trigger a web service lookup, you might want to discard values that occur within a short amount of time before the next value so as not to perform lookups that will immediately be discarded? Rx.NET calls that Throttle, and IIRC the SWT forms library has such a thing also.
Well, to just discard the value is easy, it's what I implemented in the slowdown method in a previous post. What I want is variation on that where a value is not discarded but it is kept and if no other new value comes it is outputted after the waiting period is over. I attach an image with a diagram that should make clear what I want to get.
Let's put the question this way: imagine the algorithm was not being applied to values occurring in time but you were calculating it retroactively on a collection. So you have a List[(Long, Double)] (sorted ascending by the Long value), and you need to drop elements that are superseded within a close enough Long value --- are you able to get that working?
Yes, I can. This does what I want :
val waitTime = 5val list = List( (0,1), (2,3), (3,2), (4,7), (12,3), (13,4) ) val x = list.sliding(2).toList.foldLeft(List.empty[(Int, Int)]){ case (v, a) if v.size == 0 => List( a(0) ) case (v, (t0,x0)::(t1,x1)::tail) => if(((t0 - v.last._1) > waitTime) ) { v :+ (t0,x0) } else if( ((t1 - v.last._1) > waitTime) ) { v :+ (v.last._1 + waitTime,x0) } else v case (v,_) => v}//special check for last item val y = if( (list.last._1 - x.last._1) > waitTime) { x :+ list.last} else { x :+ (x.last._1 + waitTime, list.last._2)}println(y)
result:
List((0,1), (5,7), (12,3), (17,4))
best,Miguel
Thu, 2012-02-09, 17:01
#7
Re: [reactive-core] limit rate of firing but remember last valu
On Tuesday, January 31, 2012 8:59:29 AM UTC, nafg wrote:
Sorry I didn't respond sooner. I didn't end up getting to it until the next day. I added a throttle method to EventStream which is being pushed to github right now. Let me know if it does what you want.
To be clear, what it does is that every time an event is received, it records it in a var and resets a timer (I used your code) that fires the value. If a value is received before the timer fires of course the timer is reset, and the new value is recorded fire the timer to fire. So events will always be received at the event time plus the delay time.
Hi
Sorry for the delay on my part, this time.
Looking at the code, It seems to me it's not quite I wanted. You are resetting the timer every time a new event arrives, instead the timer should be reset only when a event is fired by the Throttled instance. I don't know how to check if a timer is still active, but what I would do would be something like:
class Throttled(delay: Long) extends ChildEventSource[T, (Option[T], Long)](None -> System.currentTimeMillis) { override def debugName = EventSource.this.debugName+".throttle("+delay+")" private def onTimer { val t1 = System.currentTimeMillis state._1 match { case Some(e) => fire(e) case _ => } state = None -> t1 } var tt = _timer.schedule(delay)(onTimer) def handler = { case (event, _) => if( tt has already waited and ran it's action, i.e it's not waiting to do something) { fire(event) tt = _timer.schedule(delay)(onTimer)
} else Some(event) -> System.currentTimeMillis } }
class Throttled(delay: Long) extends ChildEventSource[T, (Option[T], Long)](None -> System.currentTimeMillis) { ... def handler = { case (event, _) => if( tt has already waited and ran it's action, i.e it's not waiting to do something) { fire(event) tt = _timer.schedule(delay)(onTimer)
} Some(event) -> System.currentTimeMillis } }
Also, running a test I don't get what I expect to get from the current code:
import reactive._
object ExampleReactiveThrottle extends Observing{
def main(args: Array[String]) {
val x = new EventSource[Double] val result = x.throttle(1000L)
result.foreach(println)
x.fire(2.0) Thread.sleep(3000L) x.fire(1.0)
}
}
I get just 2.0 instead of 2.0, 1.0 .
But my original question was if it is possible to do this in a functional way, using just the language of event streams and signals, so I guess its not possible, no ?. The throttled method is using a var, so I guess it's not functional.
best,Miguel
Mon, 2012-02-13, 11:31
#8
Re: [reactive-core] limit rate of firing but remember last valu
On Thu, Feb 9, 2012 at 10:52 AM, Miguel Negrão <miguel.negrao-lists@friendlyvirus.org> wrote:
On Tuesday, January 31, 2012 8:59:29 AM UTC, nafg wrote:Sorry I didn't respond sooner. I didn't end up getting to it until the next day. I added a throttle method to EventStream which is being pushed to github right now. Let me know if it does what you want.
To be clear, what it does is that every time an event is received, it records it in a var and resets a timer (I used your code) that fires the value. If a value is received before the timer fires of course the timer is reset, and the new value is recorded fire the timer to fire. So events will always be received at the event time plus the delay time.
Hi
Sorry for the delay on my part, this time.
Looking at the code, It seems to me it's not quite I wanted. You are resetting the timer every time a new event arrives, instead the timer should be reset only when a event is fired by the Throttled instance. I don't know how to check if a timer is still active, but what I would do would be something like:
class Throttled(delay: Long) extends ChildEventSource[T, (Option[T], Long)](None -> System.currentTimeMillis) { override def debugName = EventSource.this.debugName+".throttle("+delay+")" private def onTimer { val t1 = System.currentTimeMillis state._1 match { case Some(e) => fire(e) case _ => } state = None -> t1 } var tt = _timer.schedule(delay)(onTimer) def handler = { case (event, _) => if( tt has already waited and ran it's action, i.e it's not waiting to do something) { fire(event) tt = _timer.schedule(delay)(onTimer)
} else Some(event) -> System.currentTimeMillis } }
Not sure I understand the code. First of all did you change anything besides what I made bold? Second, you still have a call to fire in onTimer, so it's being called from two places. Third, I'm not sure what "its action" means (if not firing the event, in which case why fire it again). So I still don't understand what you want it to do. :(Maybe what you want is, that the timer should block further events? So:event e1 comes at time t1, start timer.event e2 comes at time t2, drop it. Timer runs, e1 is fired.
What is the use case?
class Throttled(delay: Long) extends ChildEventSource[T, (Option[T], Long)](None -> System.currentTimeMillis) { ... def handler = { case (event, _) => if( tt has already waited and ran it's action, i.e it's not waiting to do something) { fire(event) tt = _timer.schedule(delay)(onTimer)
} Some(event) -> System.currentTimeMillis } }
Also, running a test I don't get what I expect to get from the current code:
import reactive._
object ExampleReactiveThrottle extends Observing{
def main(args: Array[String]) {
val x = new EventSource[Double] val result = x.throttle(1000L)
result.foreach(println)
x.fire(2.0) Thread.sleep(3000L) x.fire(1.0)
} }
I get just 2.0 instead of 2.0, 1.0 .
Hmm, if you can find the bug it would be great! Does that happen every time or just sometimes?
But my original question was if it is possible to do this in a functional way, using just the language of event streams and signals, so I guess its not possible, no ?. The throttled method is using a var, so I guess it's not functional.
It could be it is possible, I just didn't put in the time to try. If you can suggest a better architecture that would enable doing so, I'm all ears.
best,Miguel
def pulses[A](es: EventStream[A], t: Double = 1000.0): EventStream[List[A]] = es.map(_ -> System.currentTimeMillis).foldLeft((List.empty[(A,Long)],List.empty[(A,Long)])){ case ((old, cur), a) => ...
(sorry I don't have time to work on it more now --- if that's not enough let me know.)
On Sat, Jan 14, 2012 at 7:49 AM, Miguel Negrão <miguel.negrao-lists@friendlyvirus.org> wrote: