How to count the number of values on a core.async channel

Back to SIG Clojure Tutorials

Let's say you need to create a buffered channel called job-queue, and at some time in the future, you would like to check on how many values are currently sitting in its buffer. Here is a nifty solution that combines atoms, delays, and transducers to achieve our goals.

(require '[clojure.core.async :refer [chan >!! 

This is how the magic works: When we pass a transducer to the chan function, it will be run on each new value that is put (>! or >!!) onto this channel. So each time you put a value on the job-queue, swap! is called to increment the counter in job-queue-size. You can always check its value with the usual @job-queue-size incantation. Note that the transducer function actually returns a delay object, which contains an expression body that is not executed until the delay is dereferenced. So when you take (

Here's how you might use it:

(>!! job-queue :foo)
(>!! job-queue :bar)
(>!! job-queue :baz)

@job-queue-size ;=> 3

@( 0