Recently I was writing some code to ingest new entries for my church search website. The application is written in Clojure, but the downloading is done with an external script that downloads new entries and saves them as files in a directory, but at a slow rate. I wanted to be able to specify a range of entries to download, and have the script run and then have my application ingest the files that the script generates. I wanted this to work concurrently, with multiple downloaders and multiple ingesters running at the same time. And I wanted all of this to be triggered from Clojure.
This is basically a concurrency problem so it’s a good thing I was trying to do it in using Clojure. The most obvious solution would be to launch multiple downloaders, and then have a thread which watches the directory, feeding filepaths to ingester threads using channels and core.async. Unfortunately, I’m stuck using an older version of Clojure for this project for the moment and core.async is not available. Thankfully, lazy sequences can be used as a substitute for channels in a pinch.
What I did was make a function which takes a directory path and a list of files and returns an object (a Clojure map) which tracks the presence of those files in that directory. I then have a function which returns all of the files added to the directory since the last time the directory had been checked. A reference was used to keep this straight between threads. The code looks like this:
(defn directory-watcher
"Given a directory and a list of files to wait for in that directory,
returns an object to store the found state of files in that directory."
[directory files]
(let [files (set (map (partial file directory) files))]
{:directory (file directory)
:search-files (ref files)
:original-files files}))
(defn get-new-files
"Returns any new files in the search set found since the last time
files were found and returned. Updates the watcher so that returned
files are no longer looked for."
[directory-watcher]
(dosync
(let [files (set (file-seq (:directory directory-watcher)))
new-files (intersection files @(:search-files directory-watcher))]
(alter (:search-files directory-watcher) difference new-files)
new-files)))
So now multiple threads can call get-new-files
and receive some files on which to operate without interfering with files on which other threads are operating. The problem is that because files are returned in batches, it’s easy for one thread to take all of the files at once if they are added to the directory at once. Is to have it return one file at a time.
(defn get-new-file
"Returns a file in the search set if one is found in the directory.
Removes file from the search set if it is found."
[directory-watcher]
(dosync
(let [files (set (file-seq (:directory directory-watcher)))
new-file (first (intersection files @(:search-files directory-watcher)))]
(alter (:search-files directory-watcher) difference #{new-file})
new-file)))
From that I created a blocking version:
(defn blocking-get-new-file
"Returns a file in the search set or blocks until one is found in the
directory. Will return nil when the search set is empty."
[directory-watcher]
(loop []
(when-not (empty? @(:search-files directory-watcher))
(if-let [file (get-new-file directory-watcher)]
file
(do
(Thread/sleep 10) ;Throttling
(recur))))))
From this it’s trivial to create a function which returns a lazy sequence of new files as they arrive in the directory:
(defn directory-watcher-seq
"Returns a lazy sequence of files as they appear in a directory watched by
a provided directory watcher. Multiple sequences can be created from the
same watcher and they will return different files but will all return when
the search set is exhausted."
[directory-watcher]
(when-let [file (blocking-get-new-file directory-watcher)]
(cons file (lazy-seq (directory-watcher-seq directory-watcher)))))
If you pass the same directory-watcher
to multiple calls to directory-watcher-seq
, you’ll get multiple lazy sequences each watching the same directory for the same files and guaranteed not to return the same file more than once between them. When there are no more files in the directory, they’ll return.
The reason this works is because, as I said at the beginning, the state of the directory is saved in a ref. Every time get-new-file
finds a file, it updates a ref holding the list of files yet to be found.1 This way, no other call to get-new-file
or any of it’s similar functions, will return the same file.
What I think is neat is that in a lot of other programming languages 2 this would have been taken care of using callbacks. Using a lazy sequence, I feel, is a much cleaner abstraction. Not only can I write a simple loop to handle the files as they come, I don’t have to put any thread handling code in the directory watcher code itself. This makes for a very nice separation of concerns.