Sample
One such combinator is sample, which allows us to sample an observable at a given interval, thus throttling the source observable's output. Let's apply it to our previous example:
(rx/subscribe (->> (rx/map vector (.sample (fast-producing-obs) 200 TimeUnit/MILLISECONDS) (slow-producing-obs)) (rx/map (fn [[x y]] (+ x y))) (rx/take 10)) prn-to-repl (fn [e] (prn-to-repl "error is " e))) ;; 204 ;; 404 ;; 604 ;; 807 ;; 1010 ;; 1206 ;; 1407 ;; 1613 ;; 1813 ;; 2012
The only change is that we call sample on our fast producing observable before calling map. We will sample it every 200 milliseconds.
By ignoring all other items emitted in this time slice, we have mitigated our initial problem, even though the original observable doesn't support any form of backpressure.
The sample combinator is only one of the combinators that's useful in such cases. Others include throttleFirst, debounce, buffer, and window. One drawback of this approach, however, is that a lot of the items generated end up being ignored.
Depending on the type of application we are building, this might be an acceptable compromise. But what if we are interested in all of the items?