Hands-On Reactive Programming with Clojure
上QQ阅读APP看书,第一时间看更新

Removing incidental complexity with RxClojure

In Chapter 2, A Look at Reactive Extensions, we learned about the basic building blocks of RxClojure, an open source CES framework. In this section, we'll use this knowledge to remove the incidental complexity from our program. This will give us a clear, declarative way to display both prices and rolling averages.

The UI code we've written so far remains unchanged, but we need to make sure that RxClojure is declared in the dependencies section of our project.clj file:

[io.reactivex/rxclojure "1.0.0"] 

Then, we must ensure that we have the following library:

(ns stock-market-monitor.core 
  (:require [rx.lang.clojure.core :as rx] 
            [seesaw.core :refer :all]) 
  (:import (java.util.concurrent TimeUnit) 
           (rx Observable))) 

The way we will approach the problem this time is also different. Let's take a look at the first requirement: it requires that we display the current price of a company's share on the stock market.

Every time we query the price service, we get a possibly different price for the company in question. As we saw in Chapter 2, A Look at Reactive Extensions, modeling this as an observable sequence is easy, so we'll start with that. We'll create a function that gives us back a stock price observable for the given company:

(defn make-price-obs [company-code] 
  (rx/return (share-price company-code))) 

This is an observable that yields a single value and terminates. It's equivalent to the following marble diagram:

Part of the first requirement is that we query the service on a predefined time interval every 500 milliseconds, in this case. This hints at an observable we have encountered before, aptly named interval. To get the polling behavior we want, we need to combine the interval and the price observables.

As you probably recall, flatmap is the tool for the job here:

(rx/flatmap (fn [_] (make-price-obs "XYZ")) 
            (Observable/interval 500 TimeUnit/MILLISECONDS)) 

The preceding snippet creates an observable that will yield the latest stock price for XYZ every 500 milliseconds indefinitely. It corresponds to the following diagram:

In fact, we can simply subscribe to this new observable and test it out. Modify your main function so that it looks like the following snippet and run the program:

(defn -main [& args] 
  (show! main-frame) 
  (let [price-obs (rx/flatmap (fn [_] (make-price-obs "XYZ")) 
                              (Observable/interval 500 TimeUnit/MILLISECONDS))] 
    (rx/subscribe price-obs 
                  (fn [price] 
                    (text! price-label (str "Price: " price)))))) 

This is very cool! We replicated the behavior of our first program with only a few lines of code. The best part is that we did not have to worry about thread pools or scheduling actions. By thinking about the problem in terms of observable sequences, as well as combining existing and new observables, we were able to declaratively express what we want the program to do.

This already provides great benefits in maintainability and readability. However, we are still missing the other half of our program: rolling averages.