This page is no longer maintained — Please continue to the home page at www.scala-lang.org

Transforming a diff of two Seqs

No replies
Naftoli Gugenheim
Joined: 2008-12-17,
User offline. Last seen 42 years 45 weeks ago.
Reposted as a new thread (and edited), in case someone who'd be able to help didn't read the thread it was originally posted in the middle of.
There's a bug in some new reactive code (binary artifacts not published) that I haven't been able to solve.
To put the problem very generally:
Given Seq a' = Seq a + <some inserts/removes>, and Seq b = a dropWhile (some predicate), figure out Seq b', and the diff between b and b', without running the condition function more times than necessary. The easy way would be to just do a new dropWhile on a', but that means running the predicate on elements it was already run on and then doing more diff calculation than necessary. It must generate the diff relative to the dropWhile of the previous parent as efficiently as possible. The code is already written, it just has some bugs. I would really appreciate if anyone can fix it.
[This is for SeqSignals, a Signal whose value is a DeltaSeq. Whenever its value changes, the new DeltaSeq knows the diff relative to the SeqSignal's previous value, and the SeqSignal fires that as an event. This allows derived objects can act on the incremental changes with greater efficiency. For instance, if you're rendering a table of records and one is inserted, the table will know that only one row has to be inserted into the GUI, corresponding to the row in the model. To see it in action go to http://www.reactive-web.co.cc/showdemo/RepeaterDemo, open Chrome Developer Tools or Firebug to watch network activity, and click the Add button on the page a few times.]
Here's an illustration:
Suppose SeqSignal b = SeqSignal a dropWhile(_ < 10). If SeqSignal a has (0,5,10,15), SeqSignal b will have (10, 15). Now when SeqSignal a's value changes, e.g. to (10,0,5,10,15), we want SeqSignal b to say to its underlying DeltaSeq: Here's a's previous DeltaSeq and a's updated DeltaSeq, now compute the DeltaSeq that will replace you. It should be equivalent to List(10,0,5,10,15).dropWhile(_<10) but compute the result based on the deltas from a's last value to its current value and the dropWhile predicate.
If anyone can get it to work I'd really appreciate it!! You can see the relevant source code at https://github.com/nafg/reactive/blob/master/reactive-core/src/main/scala/reactive/DeltaSeq.scala (mainly lines 261-416, although see also the comment at line 56).
To give it a spin,
git clone git://github.com/nafg/reactive.git cd reactive <editor> reactive-core/src/main/scala/reactive/DeltaSeq.scala
sbt '~test-only reactive.DeltaSeqTests'


Here's the current, immediately relevant code (as you can see, it's full of bandaids as I tried tweaking it; every time I fixed one thing another thing broke):
  sealed abstract class Transformed[T, V] extends DeltaSeq[V] {     type This <: Transformed[T, V]    def parent: DeltaSeq[T]     /**     * Given a DeltaSeq meant to replace the parent DeltaSeq,      * return a DeltaSeq that is equivalent to reapplying the     * transformation this TransformedSeq was derived from to the replacement      * TransformedSeq.     * That is, given      * val a: DeltaSeq[Int]            // 1, 2, 3     * val b: DeltaSeq[Int]            // 2, 3 (fromDelta == Remove(0, 1))      * val c: DeltaSeq[Int]            // 2, 3, 4 (fromDelta == Include(2, 4))     * val x = a map (10 *)            // 10, 20, 30      * val y = x.updatedFromParent(b)  // 20, 30 (fromDelta == Remove(0, 10))     * val z = y.updatedFromParent(c)  // 20, 30, 40 (fromDelta == Include(2, 40))      *     * Ideally the TransformedSeq should be inferred from parentUpdated.fromDelta,      * so in the above example each call to updatedFromParent only translates one delta.     */     /*   * [1, 2, 3]                                 [2, 3](Remove(0,1))                         [2, 3, 4](Include(2, 4))    *     |                                                  |                                         |   *    \/                                                 \/                                        \/    * flatMap(x => List(x*10,x*10+1).filter(_<41))  updatedFromParent                           updatedFromParent    *     |                                                  |                                         |   *    \/                                                 \/                                        \/    * [0->(0..1), 1->(2..3), 2->(4..5)]         [0->(0..1), 1->(2..3)]                      [0->(0..1), 1->(2..3), 2->(4)]    * [10,11, 20,21, 30,31]                     [20,21, 30,31](Remove(0,10),Remove(0,11))   [20,21, 30,31, 40](Include(4, 40))    *   */     def updatedFromParent(parentUpdated: DeltaSeq[T]): This
    lazy val signal = new SeqSignal[V] {      private var current: This = Transformed.this.asInstanceOf[This]       current.underlying      def now = current       val pc: DeltaSeq[T] => Unit = { ds =>        current = current.updatedFromParent(ds).asInstanceOf[This]         current.underlying        change fire now       }      parent.signal.change addListener pc       lazy val change = new EventSource[DeltaSeq[V]] {}    }   }

  class PrefixWhile[T](val parent: DeltaSeq[T], val take: Boolean, val p: T => Boolean) extends Transformed[T, T] { prev =>     type This = PrefixWhile[T]    lazy val predValues = parent.underlying.toStream map { x => x -> p(x) }     lazy val underlying: Seq[T] = if (take)      predValues.takeWhile{ case (_, b) => b }.map(_._1)     else      predValues.dropWhile{ case (_, b) => b }.map(_._1)     lazy val fromDelta = startDelta(underlying)
    /*     * takeWhile and dropWhile both see a sequence as divided into a prefix of elements that pass a predicate, and the remaining elements,      * beginning with the first element that fails the predicate. (It makes no difference whether subsequent elements pass it or fail it.)      * The difference is that takeWhile returns the prefix, and dropWhile returns the remaining elements.     *       * Edits can occur within the prefix specified by the predicate, at its border, or outside it.     * At its border means either removing the first element failing the predicate, inserting a new element at its index, or updating the value at its index.      *      * In the case of takeWhile, an edit occurring at the first element for which the predicate fails      * can cause any number of elements to be included, if the element that will now follow the current prefix will pass the predicate.      * In the case of dropWhile, it can cause any number of elements to be removed, if the element that will now follow the      * prefix will pass the predicate.     *       * An edit more than one past the end of the prefix has no effect in the case of takeWhile; in the case of dropWhile, that edit is      * applied after translating it (i - prefixLength).     *       * A remove, or insert or update whose new element passes the predicate, that occurs before the end of the prefix, has no effect in the case of dropWhile;      * in the case of takeWhile, it can be applied directly.     *       * An insert or update within the prefix, whose new element fails the predicate, in the case of takeWhile is not applied,      * and subsequent elements are removed, as the prefix has been shortened.     * In the case of dropWhile, it and subsequent elements are inserted; since the prefix has been shortened, the set of remaining elements has expanded.      *      * prefixLength: The number of elements in a row at the beginning of the parent sequence that pass the predicate. Consequently, the first index      * that is either outside of the parent sequence or whose element fails the predicate.     *       */    def updatedFromParent(parentUpdated: DeltaSeq[T]): PrefixWhile[T] = new PrefixWhile[T](parentUpdated, prev.take, prev.p.asInstanceOf[T => Boolean]) {       override lazy val (fromDelta, predValues, underlying) = {        val buf = prev.toList.toBuffer         val ds = new ArrayBuffer[SeqDelta[T, T]]        var prdVals = prev.predValues         def calcPrefixLength = {          def loop(n: Int, s: Stream[(T, Boolean)]): Int = if (s.isEmpty || !s.head._2) n else loop(n + 1, s.tail)           loop(0, prdVals)        }         var prefixLength = calcPrefixLength        def applyDelta(d: SingleDelta[T, T]): Unit = {           val oldPrefixLength = prefixLength          d match {             case Include(i, e) =>              val v = p(e)               prdVals = prdVals.patch(i, List(e -> v), 0)
              if (i < prefixLength) {                if (v) prefixLength += 1 else prefixLength = i               } else if (v && i == prefixLength) {                prefixLength = calcPrefixLength               }
              if (!take || v) {                if (take && v && i <= prefixLength && i <= buf.length) {                   ds += Include(i, e)                  buf.insert(i, e)                 } else if (!take && i >= prefixLength) {                  val j = i - prefixLength                   println("Applying original Include at "+j)                  ds += Include(j, e)                   buf.insert(j, e)                }               }
              if (prefixLength > oldPrefixLength) for (j <- oldPrefixLength + 1 until prefixLength) {                if (take && j <= buf.length) {                   ds += Include(j, prdVals(j)._1)                  buf.insert(j, prdVals(j)._1)                 } else if (!take && j < buf.length) {                  ds += Remove(0, prdVals(oldPrefixLength)._1)                   buf.remove(0)                }               }              else if (prefixLength < oldPrefixLength) for (j <- prefixLength until oldPrefixLength) {                 println("j: "+j)                if (take && prefixLength < buf.length) {                   ds += Remove(prefixLength, prdVals(j)._1)                  buf.remove(prefixLength)                 } else if (!take && j - prefixLength <= buf.length) {                  ds += Include(j - prefixLength + 1, prdVals(j + 1)._1)                   buf.insert(j - prefixLength + 1, prdVals(j + 1)._1)                }               }
            case Remove(i, o) =>              prdVals = prdVals.patch(i, Nil, 1)               if (i < prefixLength)                prefixLength -= 1               else if (i == prefixLength)                prefixLength = calcPrefixLength
              if (take && i <= prefixLength && i < buf.length) {                 ds += Remove(i, o)                buf.remove(i)               } else if (!take && i >= prefixLength) {                val j = i - prefixLength max 0                 if (j < buf.length) {                  println("Applying original Remove at "+j)                   ds += Remove(j, o)                  buf.remove(j)                 }              }
              if (prefixLength > oldPrefixLength) for (j <- oldPrefixLength until prefixLength) {                 if (take && j <= buf.length && (prdVals isDefinedAt j)) {                  ds += Include(j, prdVals(j)._1)                   buf.insert(j, prdVals(j)._1)                } else if (!take && buf.nonEmpty && (prdVals isDefinedAt j)) {                   ds += Remove(0, prdVals(j)._1)                  buf.remove(0)                 }              }               else if (prefixLength < oldPrefixLength) {                for (j <- (if (take) prefixLength else prefixLength) until oldPrefixLength) {                   println("j: "+j)                  if (take && j < buf.length && (prdVals isDefinedAt j)) {                     ds += Remove(prefixLength, prdVals(j)._1)                    buf.remove(prefixLength)                   } else if (!take && i >= prefixLength && j - prefixLength <= buf.length && (prdVals isDefinedAt j + 1)) {                     ds += Include(j - prefixLength, prdVals(j + 1)._1)                    buf.insert(j - prefixLength, prdVals(j + 1)._1)                   }                }               }
            case Update(i, o, e) =>              applyDelta(Remove(i, o))               applyDelta(Include(i, o))          }         }        println("prev.parent: "+prev.parent)         println("parentUpdated: "+parentUpdated)        println("take: "+take)         SeqDelta flatten List(parentUpdated.fromDelta) foreach { d =>          println("Delta: "+d)           println(">prdVals: "+prdVals)          println(">prefixLength: "+prefixLength)           println(">buf: "+buf)          println(">ds: "+ds)           applyDelta(d)          println("<prdVals: "+prdVals)           println("<prefixLength: "+prefixLength)          println("<buf: "+buf)           println("<ds: "+ds)        }         (Batch(ds: _*), prdVals, buf.toSeq)      }     }  }

Copyright © 2012 École Polytechnique Fédérale de Lausanne (EPFL), Lausanne, Switzerland