Hands-On Reactive Programming with Clojure
上QQ阅读APP看书,第一时间看更新

The Communicating Sequential Processes paper

The core.async library is built on an old idea. The foundation upon which it lies was first described by Tony Hoare of Quicksort fame in his 1978 paper Communicating Sequential Processes (CSP)[1]. CSP has since been extended and implemented in several languages, the latest of which being Google's Go programming language[2].

It is beyond the scope of this book to go into the details of this seminal paper, so what follows is a simplified description of the main ideas.

In CSP, work is modeled using two main abstractions: channels and processes. CSP is also message-driven and, as such, it completely decouples the producer from the consumer of the message. It is useful to think of channels as blocking queues.

A simplistic approach that demonstrates these basic abstractions is as follows:

(import 'java.util.concurrent.ArrayBlockingQueue) 
 
(defn producer [c] 
  (prn "Taking a nap") 
  (Thread/sleep 5000) 
  (prn "Now putting a name in queue...") 
  (.put c "Leo")) 
 
(defn consumer [c] 
  (prn "Attempting to take value from queue now...") 
  (prn (str "Got it. Hello " (.take c) "!"))) 
 
(def chan (ArrayBlockingQueue. 10)) 
 
(future (consumer chan)) 
(future (producer chan)) 

Running this code in the REPL should show us an output similar to the following:

    "Attempting to take value from queue now..."
    "Taking a nap"
    ;; then 5 seconds later
    "Now putting a name in queue..."
    "Got it. Hello Leo!"  

So that we don't block our program, we start both the consumer and the producer in their own threads, using future. Since the consumer was started first, we will most likely see its output immediately. However, as soon as it attempts to take a value from the channel—or queue—it will block. It will wait for a value to become available and will only proceed after the producer is done taking its nap—clearly a very important task.

Now, let's compare it with a solution by using core.async. First, create a new Leiningen project and add a dependency to it:

[org.clojure/core.async "0.4.474"] 

Now, type the following into the REPL or in your core namespace:

(ns core-async-playground.core
(:require [clojure.core.async :refer [go chan <! >! timeout <!!]]))


(defn prn-with-thread-id [s]
(prn (str s " - Thread id: " (.getId (Thread/currentThread)))))

(defn producer [c]
(go (prn-with-thread-id "Taking a nap ")
(<! (timeout 5000))
(prn-with-thread-id "Now putting a name in que queue...")
(>! c "Leo")))

(defn consumer [c]
(go (prn-with-thread-id "Attempting to take value from queue now...")
(prn-with-thread-id (str "Got it. Hello " (<! c) "!"))))

(def c (chan))

(consumer c)
(producer c)

This time, we are using a helper function, prn-with-thread-id, which appends the current thread ID to the output string. I will explain why shortly, but apart from that, the output will be equivalent to the previous one:

"Attempting to take value from queue now... - Thread id: 43"
"Taking a nap  - Thread id: 44"
"Now putting a name in que queue... - Thread id: 48"
"Got it. Hello Leo! - Thread id: 48"  

Structurally, both solutions look fairly similar, but since we are using quite a few new functions here, let's break it down:

  • chan is a function that creates a core.async channel. As mentioned previously, it can be thought of as a concurrent blocking queue and is the main abstraction in the library. By default, chan creates an unbuffered channel, but core.async provides many more useful channel constructors, a few of which we'll be using later.
  • timeout is another such channel constructor. It gives us a channel that will wait for a given amount of time before returning nil to the taking process, closing itself immediately afterward. This is the core.async equivalent of thread/sleep.
  • The >! and <! functions are used to put and take values from a channel, respectively. The caveat is that they have to be used inside a go block, as we will explain later.
  • go is a macro that takes a body of expressions—that form a go block—and creates lightweight processes. This is where the magic happens. Inside a go block, any calls to >! and <! that would ordinarily block, waiting for values to be available in channels, are instead parked. Parking is a special type of blocking that's used internally in the state machine of core.async. A blog post by Huey Petersen covers this state machine in depth (see http://hueypetersen.com/posts/2013/08/02/the-state-machines-of-core-async/).

The go blocks are the very reason I chose to print the thread IDs in our example. If we look closely, we'll realize that the last two statements were executed in the same thread. However, this isn't true 100% of the time, as concurrency is inherently
non-deterministic. This is the fundamental difference between core.async and solutions that are using threads/futures.

Threads can be expensive. On the JVM, their default stack size is 512 kilobytes, which can be configured via the -Xss JVM startup option. When developing a highly concurrent system, creating thousands of threads can quickly drain the resources of the machine the application is running on.

core.async acknowledges this limitation and gives us lightweight processes. Internally, they do share a thread pool, but instead of wastefully creating a thread per go block, threads are recycled and reused when a put/take operation is waiting for a value to become available.

At the time of writing, the thread pool used by core.async defaults to the number of available processors x 2, + 42. So, a machine with eight processors will have a pool with 58 threads.

Therefore, it is common for core.async applications to have dozens of thousands of lightweight processes. They are extremely cheap to create.

Since this is a book on Reactive Programming, the question that might be in your head now is: can we build reactive applications using core.async? The short answer is yes, we can! To prove it, we will revisit our stock market application and rewrite it using core.async.