taoensso.carmine.message-queue

Carmine-backed Clojure message queue. All heavy lifting by Redis.
Message circle architecture used here is simple, reliable, and has
reasonable throughput but at best mediocre latency.

Redis keys:
  * carmine:mq:<qname>:messages     - hash, {mid mcontent}.
  * carmine:mq:<qname>:locks        - hash, {mid lock-expiry-time}.
  * carmine:mq:<qname>:backoffs     - hash, {mid backoff-expiry-time}.
  * carmine:mq:<qname>:nattempts    - hash, {mid attempt-count}.
  * carmine:mq:<qname>:mid-circle   - list, rotating list of mids (next on right).
  * carmine:mq:<qname>:done         - set, awaiting gc, requeue, etc.
  * carmine:mq:<qname>:requeue      - set, for `allow-requeue?` option.
  * carmine:mq:<qname>:eoq-backoff? - ttl flag, used for queue-wide
                                      (every-worker) polling backoff.
  * carmine:mq:<qname>:ndry-runs    - int, number of times worker(s) have
                                      burnt through queue w/o work to do.

Ref. http://antirez.com/post/250 for basic implementation details

clear-queues

(clear-queues conn-opts & qnames)

dequeue

IMPLEMENTATION DETAIL: Use `worker` instead.
Rotates queue's mid-circle and processes next mid. Returns:
  nil             - If msg GC'd, locked, or set to backoff.
  "eoq-backoff" - If circle uninitialized or end-of-circle marker reached.
  [<mid> <mcontent> <attempt>] - If message should be (re)handled now.

enqueue

(enqueue qname message)(enqueue qname message {:keys [unique-message-id allow-requeue? initial-backoff-ms]})
Pushes given message (any Clojure datatype) to named queue and returns unique
message id or {:carmine.mq/error <message-status>}. Options:
  * unique-message-id  - Specify an explicit message id (e.g. message hash) to
                         perform a de-duplication check. If unspecified, a
                         unique id will be auto-generated.
  * allow-requeue?     - When true, allow buffered escrow-requeue for a
                         message in the :locked or :done-with-backoff state.
  * initial-backoff-ms - Initial backoff in millis.

exp-backoff

(exp-backoff n-attempt)(exp-backoff n-attempt {:keys [min max factor], :or {factor 1000}})
Returns binary exponential backoff value for n<=36.

handle1

(handle1 conn-opts qname handler [mid mcontent attempt :as poll-reply])
Implementation detail!

IWorker

protocol

members

start

(start this)

stop

(stop this)

make-dequeue-worker

(make-dequeue-worker pool spec & {:keys [handler-fn handler-ttl-msecs backoff-msecs throttle-msecs auto-start?]})
DEPRECATED: Use `worker` instead.

message-status

Returns current message status, e/o:
  :queued               - Awaiting handler.
  :queued-with-backoff  - Awaiting rehandling.
  :locked               - Currently with handler.
  :locked-with-requeue  - Currently with handler, will requeue on success.
  :done-awaiting-gc     - Finished handling, awaiting GC.
  :done-with-backoff    - Finished handling, awaiting dedupe timeout.
nil                   - Already GC'd or invalid message id.

monitor-fn

(monitor-fn qname max-circle-size warn-backoff-ms)
Returns a worker monitor fn that warns when queue's mid-circle exceeds
the prescribed size. A backoff timeout can be provided to rate-limit this
warning.

queue-status

(queue-status conn-opts qname)

worker

(worker conn-opts qname & [{:keys [handler monitor lock-ms eoq-backoff-ms nthreads throttle-ms auto-start], :as opts, :or {handler (fn [args] (timbre/infof "%s" args) {:status :success}), monitor (monitor-fn qname 1000 (enc/ms :hours 6)), lock-ms (enc/ms :hours 1), nthreads 1, throttle-ms 200, eoq-backoff-ms exp-backoff, auto-start true}}])
Returns a threaded worker to poll for and handle messages `enqueue`'d to
named queue. Options:
 :handler        - (fn [{:keys [qname mid message attempt]}]) that throws an ex
                   or returns {:status     <#{:success :error :retry}>
                               :throwable  <Throwable>
                               :backoff-ms <retry-or-dedupe-backoff-ms}.
 :monitor        - (fn [{:keys [mid-circle-size ndry-runs poll-reply]}])
                   called on each worker loop iteration. Useful for queue
                   monitoring/logging. See also `monitor-fn`.
 :lock-ms        - Max time handler may keep a message before handler
                   considered fatally stalled and message re-queued. Must be
                   sufficiently high to prevent double handling.
 :eoq-backoff-ms - Thread sleep period each time end of queue is reached.
                   Can be a (fn [ndry-runs]) -> ms (n<=5) will be used.
                   Sleep synchronized for all queue workers.
 :nthreads       - Number of synchronized worker threads to use.
 :throttle-ms    - Thread sleep period between each poll.