Hands-On Reactive Programming with Clojure
上QQ阅读APP看书,第一时间看更新

Backpressure

Another issue we might be faced with is observables that produce items faster than we can consume them. The problem that arises in this scenario is to do with the ever-growing backlog of items.

As an example, think about zipping two observables together. The zip operator (or map in RxClojure) will only emit a new value when all observables have emitted an item.

So, if one of these observables is a lot faster at producing items than the others, map will need to buffer these items and wait for the others, which will most likely cause an error, as shown here:

(defn fast-producing-obs [] 
  (rx/map inc (Observable/interval 1 TimeUnit/MILLISECONDS))) 
 
(defn slow-producing-obs [] 
  (rx/map inc (Observable/interval 500 TimeUnit/MILLISECONDS))) 
 
(rx/subscribe (->> (rx/map vector 
                           (fast-producing-obs) 
                           (slow-producing-obs)) 
                   (rx/map (fn [[x y]] 
                             (+ x y))) 
                   (rx/take 10)) 
              prn-to-repl 
              (fn [e] (prn-to-repl "error is " e))) 
 
;; "error is " #<MissingBackpressureException rx.exceptions.MissingBackpressureException> 

As we can see in the preceding code, we have a fast-producing observable that emits items 500 times faster than the slower observable. Clearly, we can't keep up with it and, surely enough, Rx throws MissingBackpressureException.

What this exception is telling us is that the fast-producing observable doesn't support any type of backpressurewhat Rx calls Reactive pull backpressure—that is, consumers can't tell it to go slower. Thankfully, Rx provides us with combinators, which are helpful in these scenarios.