Hands-On Reactive Programming with Clojure
上QQ阅读APP看书,第一时间看更新

Sample

One such combinator is sample, which allows us to sample an observable at a given interval, thus throttling the source observable's output. Let's apply it to our previous example:

(rx/subscribe (->> (rx/map vector 
                           (.sample (fast-producing-obs) 200 
                                    TimeUnit/MILLISECONDS) 
                           (slow-producing-obs)) 
                   (rx/map (fn [[x y]] 
                             (+ x y))) 
                   (rx/take 10)) 
              prn-to-repl 
              (fn [e] (prn-to-repl "error is " e))) 
 
;; 204 
;; 404 
;; 604 
;; 807 
;; 1010 
;; 1206 
;; 1407 
;; 1613 
;; 1813 
;; 2012 

The only change is that we call sample on our fast producing observable before calling map. We will sample it every 200 milliseconds.

By ignoring all other items emitted in this time slice, we have mitigated our initial problem, even though the original observable doesn't support any form of backpressure.

The sample combinator is only one of the combinators that's useful in such cases. Others include throttleFirst, debounce, buffer, and window. One drawback of this approach, however, is that a lot of the items generated end up being ignored.

Depending on the type of application we are building, this might be an acceptable compromise. But what if we are interested in all of the items?