taoensso.carmine.message-queue
Carmine-backed Clojure message queue. All heavy lifting by Redis.
Message circle architecture used here is simple, reliable, and has
reasonable throughput but at best mediocre latency.
Redis keys:
* carmine:mq:<qname>:messages - hash, {mid mcontent}.
* carmine:mq:<qname>:locks - hash, {mid lock-expiry-time}.
* carmine:mq:<qname>:backoffs - hash, {mid backoff-expiry-time}.
* carmine:mq:<qname>:nattempts - hash, {mid attempt-count}.
* carmine:mq:<qname>:mid-circle - list, rotating list of mids (next on right).
* carmine:mq:<qname>:done - set, awaiting gc, requeue, etc.
* carmine:mq:<qname>:requeue - set, for `allow-requeue?` option.
* carmine:mq:<qname>:eoq-backoff? - ttl flag, used for queue-wide
(every-worker) polling backoff.
* carmine:mq:<qname>:ndry-runs - int, number of times worker(s) have
burnt through queue w/o work to do.
Ref. http://antirez.com/post/250 for basic implementation details
clear-queues
(clear-queues conn-opts & qnames)
dequeue
IMPLEMENTATION DETAIL: Use `worker` instead.
Rotates queue's mid-circle and processes next mid. Returns:
nil - If msg GC'd, locked, or set to backoff.
"eoq-backoff" - If circle uninitialized or end-of-circle marker reached.
[<mid> <mcontent> <attempt>] - If message should be (re)handled now.
enqueue
(enqueue qname message)
(enqueue qname message {:keys [unique-message-id allow-requeue? initial-backoff-ms]})
Pushes given message (any Clojure datatype) to named queue and returns unique
message id or {:carmine.mq/error <message-status>}. Options:
* unique-message-id - Specify an explicit message id (e.g. message hash) to
perform a de-duplication check. If unspecified, a
unique id will be auto-generated.
* allow-requeue? - When true, allow buffered escrow-requeue for a
message in the :locked or :done-with-backoff state.
* initial-backoff-ms - Initial backoff in millis.
exp-backoff
(exp-backoff n-attempt)
(exp-backoff n-attempt {:keys [min max factor], :or {factor 1000}})
Returns binary exponential backoff value for n<=36.
handle1
(handle1 conn-opts qname handler [mid mcontent attempt :as poll-reply])
make-dequeue-worker
(make-dequeue-worker pool spec & {:keys [handler-fn handler-ttl-msecs backoff-msecs throttle-msecs auto-start?]})
DEPRECATED: Use `worker` instead.
message-status
Returns current message status, e/o:
:queued - Awaiting handler.
:queued-with-backoff - Awaiting rehandling.
:locked - Currently with handler.
:locked-with-requeue - Currently with handler, will requeue on success.
:done-awaiting-gc - Finished handling, awaiting GC.
:done-with-backoff - Finished handling, awaiting dedupe timeout.
nil - Already GC'd or invalid message id.
monitor-fn
(monitor-fn qname max-circle-size warn-backoff-ms)
Returns a worker monitor fn that warns when queue's mid-circle exceeds
the prescribed size. A backoff timeout can be provided to rate-limit this
warning.
queue-status
(queue-status conn-opts qname)
worker
(worker conn-opts qname & [{:keys [handler monitor lock-ms eoq-backoff-ms nthreads throttle-ms auto-start], :as opts, :or {handler (fn [args] (timbre/infof "%s" args) {:status :success}), monitor (monitor-fn qname 1000 (enc/ms :hours 6)), lock-ms (enc/ms :hours 1), nthreads 1, throttle-ms 200, eoq-backoff-ms exp-backoff, auto-start true}}])
Returns a threaded worker to poll for and handle messages `enqueue`'d to
named queue. Options:
:handler - (fn [{:keys [qname mid message attempt]}]) that throws an ex
or returns {:status <#{:success :error :retry}>
:throwable <Throwable>
:backoff-ms <retry-or-dedupe-backoff-ms}.
:monitor - (fn [{:keys [mid-circle-size ndry-runs poll-reply]}])
called on each worker loop iteration. Useful for queue
monitoring/logging. See also `monitor-fn`.
:lock-ms - Max time handler may keep a message before handler
considered fatally stalled and message re-queued. Must be
sufficiently high to prevent double handling.
:eoq-backoff-ms - Thread sleep period each time end of queue is reached.
Can be a (fn [ndry-runs]) -> ms (n<=5) will be used.
Sleep synchronized for all queue workers.
:nthreads - Number of synchronized worker threads to use.
:throttle-ms - Thread sleep period between each poll.