Queues

All functions are located in the com.biffweb namespace.

use-queues

View source
(use-queues
 {:keys
  [biff/modules
   biff/plugins
   biff/features
   biff.queues/enabled-ids
   biff.queues/stop-timeout],
  :as ctx})
A Biff component that creates in-memory queues and thread pools to consume them.

modules:      A var containing a collection of module maps. Each module map
              may contain a :queues key, which contains a collection of queue
              config maps. See below. Required.
plugins:      Deprecated. Alias for modules. modules takes precedence.
features:     Deprecated. Alias for modules. modules and plugins take
              precedence.
enabled-ids:  An optional set of queue IDs. If non-nil, only queues in the
              set will be created.
stop-timeout: When shutting down, the number of milliseconds to wait before
              killing any running job consumers. Default 10000.

Adds a :biff/queues key to the system map which contains a map of queue IDs
to `java.util.concurrent.BlockingQueue`s. Use (.add queue {...}) to submit a
job. Jobs are arbitrary maps. Each queue will be consumed by a fixed-size
thread pool.

Queue config maps may have the following keys:

id:        Used as a key in the :biff/queues map. Required.
consumer:  A one-argument function that will receive a job whenever one is
           available. Receives the system map with :biff/job and :biff/queue
           keys set. biff/merge-context will be called on the system map
           before the consumer is called. Required.
n-threads: The number of worker threads in the consumer thread pool for this
           queue. Default 1.
queue-fn:  A zero-arg function that returns a BlockingQueue instance. By
           default, a PriorityBlockingQueue with a custom comparator will be
           used: jobs may include a :biff/priority key, which defaults to 10.
           Lower numbers have higher priority.

Example:

(defn echo-consumer [{:keys [biff/job] :as ctx}]
  (prn :echo job)
  (when-some [callback (:biff/callback job)]
    (callback job)))

(def module
  {:queues [{:id :echo
             :consumer #'echo-consumer}]})

(biff/submit-job ctx :echo {:foo "bar"})
=>
(out) :echo {:foo "bar"}
true

@(biff/submit-job-for-result ctx :echo {:foo "bar"})
=>
(out) :echo {:foo "bar", :biff/callback #function[...]}
{:foo "bar", :biff/callback #function[...]}

submit-job

View source
(submit-job ctx queue-id job)
Convenience function which calls (.add (get-in ctx [:biff/queues queue-id]) job)

submit-job-for-result

View source
(submit-job-for-result
 {:keys [biff.queues/result-timeout],
  :or {result-timeout 20000},
  :as ctx}
 queue-id
 job)
Like submit-job, but returns a promise which will contain the result of the
queue operation.

A :biff/callback key, containing a one-argument function, will be added to
the job. The consumer function must pass a result to this callback. If the
result is an Exception object, it will be re-thrown on the current thread. An
exception will also be thrown if the callback function is not called within
the number of milliseconds specified by result-timeout.
Have a question? Join the #biff channel on Clojurians Slack, or ask on GitHub.

Sign up for Biff: The Newsletter
Announcements, blog posts, et cetera et cetera.
This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.