More fun with Clojure lazy sequences


Recently I was writ­ing some code to ingest new entries for my church search web­site. The appli­ca­tion is writ­ten in Clo­jure, but the down­load­ing is done with an exter­nal script that down­loads new entries and saves them as files in a direc­to­ry, but at a slow rate. I wanted to be able to spec­ify a range of entries to down­load, and have the script run and then have my appli­ca­tion ingest the files that the script gen­er­ates. I wanted this to work con­cur­rent­ly, with mul­ti­ple down­load­ers and mul­ti­ple ingesters run­ning at the same time. And I wanted all of this to be trig­gered from Clo­jure.

This is basi­cally a con­cur­rency prob­lem so it’s a good thing I was try­ing to do it in using Clo­jure. The most obvi­ous solu­tion would be to launch mul­ti­ple down­load­ers, and then have a thread which watches the direc­to­ry, feed­ing filepaths to ingester threads using chan­nels and core.a­sync. Unfor­tu­nate­ly, I’m stuck using an older ver­sion of Clo­jure for this project for the moment and core.a­sync is not avail­able. Thank­ful­ly, lazy sequences can be used as a sub­sti­tute for chan­nels in a pinch.

What I did was make a func­tion which takes a direc­tory path and a list of files and returns an object (a Clo­jure map) which tracks the pres­ence of those files in that direc­to­ry. I then have a func­tion which returns all of the files added to the direc­tory since the last time the direc­tory had been checked. A ref­er­ence was used to keep this straight between threads. The code looks like this:

(defn directory-watcher 
  "Given a directory and a list of files to wait for in that directory,
  returns an object to store the found state of files in that directory."
  [directory files]
  (let [files (set (map (partial file directory) files))]
    {:directory (file directory)
     :search-files (ref files)
     :original-files files}))

(defn get-new-files 
  "Returns any new files in the search set found since the last time 
  files were found and returned. Updates the watcher so that returned
  files are no longer looked for."
  [directory-watcher]
  (dosync
   (let [files (set (file-seq (:directory directory-watcher)))
         new-files (intersection files @(:search-files directory-watcher))]
     (alter (:search-files directory-watcher) difference new-files)
     new-files)))

So now mul­ti­ple threads can call get-new-files and receive some files on which to oper­ate with­out inter­fer­ing with files on which other threads are oper­at­ing. The prob­lem is that because files are returned in batch­es, it’s easy for one thread to take all of the files at once if they are added to the direc­tory at once. Is to have it return one file at a time.

(defn get-new-file 
  "Returns a file in the search set if one is found in the directory.
  Removes file from the search set if it is found."
  [directory-watcher]
  (dosync
   (let [files (set (file-seq (:directory directory-watcher)))
         new-file (first (intersection files @(:search-files directory-watcher)))]
     (alter (:search-files directory-watcher) difference #{new-file})
     new-file)))

From that I cre­ated a block­ing version:

(defn blocking-get-new-file 
  "Returns a file in the search set or blocks until one is found in the
  directory. Will return nil when the search set is empty."
  [directory-watcher]
  (loop []
    (when-not (empty? @(:search-files directory-watcher))
      (if-let [file (get-new-file directory-watcher)]
        file
        (do
          (Thread/sleep 10) ;Throttling
          (recur))))))

From this it’s triv­ial to cre­ate a func­tion which returns a lazy sequence of new files as they arrive in the directory:

(defn directory-watcher-seq 
  "Returns a lazy sequence of files as they appear in a directory watched by
  a provided directory watcher. Multiple sequences can be created from the
  same watcher and they will return different files but will all return when 
  the search set is exhausted."
  [directory-watcher]
  (when-let [file (blocking-get-new-file directory-watcher)]
    (cons file (lazy-seq (directory-watcher-seq directory-watcher)))))

If you pass the same directory-watcher to mul­ti­ple calls to directory-watcher-seq, you’ll get mul­ti­ple lazy sequences each watch­ing the same direc­tory for the same files and guar­an­teed not to return the same file more than once between them. When there are no more files in the direc­to­ry, they’ll return.

The rea­son this works is because, as I said at the begin­ning, the state of the direc­tory is saved in a ref. Every time get-new-file finds a file, it updates a ref hold­ing the list of files yet to be found.1 This way, no other call to get-new-file or any of it’s sim­i­lar func­tions, will return the same file.

What I think is neat is that in a lot of other pro­gram­ming lan­guages 2 this would have been taken care of using call­backs. Using a lazy sequence, I feel, is a much cleaner abstrac­tion. Not only can I write a sim­ple loop to han­dle the files as they come, I don’t have to put any thread han­dling code in the direc­tory watcher code itself. This makes for a very nice sep­a­ra­tion of con­cerns.

  1. This ref is found in the directory-watcher object. 
  2. javascript cough cough 

Last update: 13/07/2014

blog comments powered by Disqus