Saturday, January 5, 2013

Go Concurrency Constructs in Clojure, part 2: select

"The select statement is a key part of why concurrency is built into Go as features of the language, rather than just a library. It's hard to do control structures that depend on libraries."

-Rob Pike, 2012 Google IO Conference

In the first blog entry of this series, I introduced some simple examples of the CSP (Communicating Sequential Processes) model of concurrency that have been built into the Go language. I'm blogging my investigation of how we might leverage this style of concurrent programming in Clojure.

The key benefit of the CSP approach is that you can use normal sequential semantics and control flows that are easy to reason about while building concurrent flows and processes. Channels are used to communicate and synchronize processes to bring control and deterministic behavior to an otherwise non-deterministic concurrent environment. We can do this without locks or other low-level constructs that are hard to reason about. The CSP constructs are built on top of those low-level primitives (or at least compare-and-swap mechanisms), but they are hidden from view from the application developer.


/* ---[ A construct to wait for the next available channel ]--- */

Go comes with a ready-made control structure called select. It provides a shorthand way to specify how to deal with multiple channels, as well as allow for timeouts and non-blocking behavior (via a "default" clause). It looks like a switch/case statement in C-based languages, but is different in that all paths involving a channel are evaluated, rather than just picking the first one that is ready.

Let's look at an example (adapted from Pike's 2012 Google IO talk):


 select {
 case v1 := <-c1:
     fmt.Printf("received %v from c1\n", v1)
 case v2 := <-c2:
     fmt.Printf("received %v from c2\n", v2)
 }

This select wraps two channels. It evaluates both channels and there are four possible scenarios:

  1. c1 is ready to give a message, but c2 is not. The message from c1 is read into the variable v1 and the code clause for that first case is executed.
  2. c2 is ready to give a message, but c1 is not. v2 then is assigned to the value read from c2 and its code clause is executed.
  3. Both c1 and c2 are ready to give a message. One of them is randomly chosen to execute and the other does not execute. Note this means that you cannot depend on the order your clauses will be executed in.
  4. Neither c1 nor c2 are ready to give a message. The select will block until the first one is ready, at which point it will be read from the channel and execute the corresponding code clause.

Select statements can also have a default to make it non-blocking:


 select {
 case v1 := <-c1:
     fmt.Printf("received %v from c1\n", v1)
 case v2 := <-c2:
     fmt.Printf("received %v from c2\n", v2)
 default:
     fmt.Println("no channel was ready to communicate")
 }

If neither channel is ready, the select executes the default clause and returns immediately.

Finally, select statements can also have a timeout:


 for {
     select {
     case v1 := <-c1:
         fmt.Printf("received %v from c1\n", v1)
     case v2 := <-c2:
         fmt.Printf("received %v from c2\n", v2)
     case <-time.After(1 * time.Second):
         fmt.Println("You're too slow!")
     }
 }

In this example, the select is wrapped in an infinite loop, which will stop the first time any one round takes longer than 1 second to read from either channel. But we can also set a timeout on the loop as a whole:


 timeout := time.After(1 * time.Second)
 for {
     select {
     case v1 := <-c1:
         fmt.Printf("received %v from c1\n", v1)
     case v2 := <-c2:
         fmt.Printf("received %v from c2\n", v2)
     case <-timeout:
         fmt.Println("Time's up!")
     }
 }

Now the loop will always cease after 1 second and in that one second it will read as many times as possible from either channel.

Here is an example of using selects with timeouts in a Go program:


/* ---[ Implementing select in Clojure ]--- */

Let's evaluate some of the ways we could emulate or implement the behavior of select in Clojure. While Go does have closures, treats functions as first class entities and deemphasizes object-orientation and inheritance, Go is not a functional language. So how should something like select be done in Clojure? What is the essence of what it accomplishes?

Let's first turn to the Racket language, a Lisp that is a descendant of Scheme. It has Events in the language. I am not deeply knowledgable about Racket, but from the research I've done the analog of select in Racket is sync. The sync function takes one or more "synchronizable events" and blocks until the first one is ready and returns that result:


 (let ((msg (sync evt1 evt2 evt3)))
    ;; do something with the first message result here
   )

As with Go's select, Racket's sync will choose to read from one of the events at random if more than one is ready.

Notice that the Racket version does not take a code block to execute for each event. In functional programming, it is preferable and more natural to return a value from an operation. Go's select is truly a control structure (in the C language sense of the word) - it does not return a value.

So let's implement Racket's sync in Clojure.

In order to implement select/sync in Clojure using the Java Queue libraries we used in the previous blog entry, we will need to able to check whether more than one of the queues has a value ready without blocking. That is why I selected the TransferQueue over the SynchronousQueue.

Next we have to decide what to name it. sync is already taken in clojure.core and has a specific and important enough meaning in Clojure that is best avoided. select is also used -- it is a function clojure.set namespace -- but since it is not in clojure.core, I'll go with it in my go-lightly namespace.

My initial implementation to get started is a simple one - it will check all the channels to see if any are ready and if not, do a short sleep. To do the check, it uses the .peek method of TransferQueue, since it neither blocks nor throws an exception if the queue is empty.

You pass select one or more channels and it immediately filters for those that already have a ready value. If there are any it picks one of those ready ones at random, dequeues the value and returns it. Only the one value is dequeued, so the other channels remain untouched.

If none are ready, it will "probe" the channels between short sleeps to get the first value it can find. This is an unsophisticated implementation, but it works for simple uses. (I'll provide a usage example after we add timeouts and "defaults" next.)


/* ---[ Adding "default" and timeouts to Clojure select ]--- */

The default clause in Go's select statement is a short circuit to not block if no channels are ready. Since Clojure's select is not a control structure, the most natural choice is to add another function, which I've called select-nowait.

As before it takes one or more channels (as a varargs list) and an optional sentinel keyword value. If no channels are ready, select-wait will return the sentinel keyword (if provided) or nil.


 user=> (select-nowait ch1 ch2 ch3 :bupkis)
 :bupkis

For timeouts, the Go example above shows that they come in two flavors: a timeout per round (timer starts each time you call select) or a timeout for a "conversation" that could involve multiple rounds of selecting the next value.

Let's take these one a time, as they will have different solutions in my implementation. For a timeout-per-select call, I've created a select-timeout function that takes a timeout (in milliseconds) as the first argument.


 ;; returns a value from one of the channels if it can
 ;; be read within 1 sec.  Otherwise it times out and
 ;; returns :go-lightly/timeout
 user=> (select-timeout 1000 ch1 ch2 ch3)
 :go-lightly/timeout

For an overall timeout, I provide two options.

First, following the pattern in Alexey Kachayev's example of doing this with the lamina library - we build a channel that will have a timeout sentinel value once the timer goes off. Use the go-lightly timeout-channel factory fn and then pass that timeout channel to the select function.

In order for the timeout-channel to be effective, you have to be continuously calling select until you hit the timeout. Also the current implementation of select doesn't preferentially look at the timeout channel first and select that over other channels if it is ready, but I'll be fixing that in later in the series.

You can also pass a timeout-channel into select-timeout if you want both types of timers running.

Second, I've added a general purpose with-timeout macro to the go-lightly.core library that wraps any arbitrary set of statements in a timeout.

Go here if you want to see the full implementation of these timeout methods.

All of these options are shown in this Clojure go-lightly example implementation of the Go "boring" select example:

Note: the channels here are no longer raw LinkedTransferQueues - they are go-lightly GoChannel type entities. See the go-lightly wiki for a detailed explanation.


/* ---[ Emulating Go's select in lamina ]--- */

lamina's analog to select is its join operation, which basically routes the output of multiple lamina channels into a single channel:


 user=> (use 'lamina.core)
 nil
 user=> (def ch1 (channel))
 #'user/ch1
 user=> (def ch2 (channel))
 #'user/ch2
 user=> (def ch3 (channel))
 #'user/ch3
 user=> (join ch1 ch3)
 true
 user=> (join ch2 ch3)
 true
 user=> [ch1 ch2 ch3]
 [<== [ … ] <== [ … ] <== […]]
 user=> (enqueue ch1 :one)
 :lamina/enqueued
 user=> (enqueue ch2 :two :three)
 :lamina/enqueued
 user=> [ch1 ch2 ch3]
 [<== [ … ] <== [ … ] <== [:one :two :three …]]
 

You can then read from the downstream channel:


 user=> @(read-channel ch3)
 :one
 user=> @(read-channel ch3)
 :two

To create a whole-conversation timeout, you can call the periodically fn that invokes your fn every 'period' milliseconds and returns the value. This was the inspiration for go-lightly's timeout-channel.

To create a per-round timeout, you can use either the read-channel* macro or the channel->lazy-seq function, both of which take a per-read timeout.

This program that demonstrates these options (and a few others) using lamina (with some helper functions from go-lightly):


/* ---[ Implementing Go's select in Clojure ]--- */

So we can provide Racket's sync functionality in Clojure either by implementing it ourselves or using lamina, but it is not as powerful as Go's select. What if you need to know not only the next value on the channels, but which channel it was read from? In that case, providing a function to execute per channel is a nice model. But to be more or less functional, the select statement still needs to return a value.

Let's hit an important point here: as I quoted at the start of this post, Piked has said that "it's hard to do control structures that depend on libraries". This is true in some languages, but not all - especially not in Lisps. You can do control structures with macros or sometimes just with functions and this is one of the key advantages of Lisp languages.

In the go-lightly library, I've implemented this as selectf and it turns out I didn't need a macro.

Here's an example of using go-lightly's selectf from the sleeping-barbers example app in the go-lightly-examples project:


  (defn barber-shop [clients-ch]
    (let [barber-ch (channel)]
      (loop [shop-state {:free-barbers (init-barber-vector)
                         :waiting-clients []}]
        (-> (selectf
             clients-ch #(client-walked-in % barber-ch shop-state)
             barber-ch  #(barber-available % barber-ch shop-state))
            (recur)))))

selectf takes pairs of arguments where the first member of the pair is a channel (or the :default keyword) and the second member of the pair is a function that takes one argument - the value read from that channel. (A function paired with :default takes no arguments.)

The return value of selectf is whatever the fn you provide returns. In the example above, I pass this value to the recur form so that I can reset the shop-state local var without having to use an atom to manage state changes.

And here is the implementation of selectf:


 (defn selectf
   "Control structure variable arity fn. Must be an even number of arguments where
   the first is either a GoChannel to read from or the keyword :default. The second
   arg is a function to call if the channel is read from.  Handler fns paired with
   channels should accept one argument - the value read from the channel.  The
   handler function paired with :default takes no args.  If no :default clause is
   provided, it blocks until a value is read from a channel (which could include
   a TimeoutChannel). Returns the value returned by the handler fn."
   [& args]
   (binding [*choose-fn* choose-tuple]
     (let [chfnmap (apply array-map args)
           [keywords chans] (partition-bifurcate
                             keyword?
                             (reduce #(conj % %2) [] (keys chfnmap)))
           choice (doselect chans nil (first keywords))]

       ;; invoke the associated fn
       (if choice
         ((chfnmap (nth choice 0)) (nth choice 1))
         ((chfnmap (first keywords)))))))

I won't give a full explanation of this implementation and all its helper functions, but notice this piece:


  (let [chfnmap (apply array-map args)
        ...
        ])

That's all that is required to turn the argument pairs into a control structure. It creates a map of channels to fns and once you have a map in Clojure, programming is straightforward.


/* ---[ Next ]--- */

In the next entry we'll implement some more interesting CSP examples in Go and Clojure and think about the pros and cons of using lamina vs. go-lightly.


/* ---[ Resources ]--- */

All of the code in this blog series, including the Go and lamina example code, is in the go-lightly project on GitHub.

Lamina library: https://github.com/ztellman/lamina

The Go examples are from Rob Pike's talk Google I/O 2012 - Go Concurrency Patterns

Alexey Kachayev wrote down the Go code that Pike used in the 2012 Google IO presentation, which otherwise doesn't seem to have been made available. Alexey published them as gists. They won't compile out of the box, so I've been modifying them, but wanted to link to his gists: https://gist.github.com/3124594.

Alexey also then brainstormed on ways to implement these examples in Clojure using the lamina library. Those gists are at: https://gist.github.com/3146759

Links to this blog series:

1 comment: