liteq v1.0.1

0

Monthly downloads

0th

Percentile

Lightweight Portable Message Queue Using 'SQLite'

Temporary and permanent message queues for R. Built on top of 'SQLite' databases. 'SQLite' provides locking, and makes it possible to detect crashed consumers. Crashed jobs can be automatically marked as "failed", or put in the queue again, potentially a limited number of times.

Readme

liteq

Lightweight Portable Message Queue Using SQLite

Linux Build Status Windows Build status CRAN RStudio mirror downloads Coverage Status

Temporary and permanent message queues for R. Built on top of SQLite databases. 'SQLite' provides locking, and makes it possible to detect crashed consumers. Crashed jobs can be automatically marked as "failed", or put back in the queue again, potentially a limited number of times.

Installation

source("https://install-github.me/r-lib/liteq")

Introduction

liteq implements a serverless message queue system in R. It can handle multiple databases, and each database can contain multiple queues.

liteq uses SQLite to store a database of queues, and uses other, temporary SQLites databases for locking, and finding crashed workers (see below).

Usage

Basic usage

library(liteq)

In the following we create a queue in a temporary queue database. The database will be removed if the R session quits.

db <- tempfile()
q <- ensure_queue("jobs", db = db)
q
#> liteq queue 'jobs'
list_queues(db)
#> [[1]]
#> liteq queue 'jobs'

Note that ensure_queue() is idempotent, if you call it again on the same database, it will return the queue that was created previously. So it is safe to call it multiple times, even from multiple processes. In case of multiple processes, the locking mechanism eliminates race conditions.

To publish a message in the queue, call publish() on the queue object:

publish(q, title = "First message", message = "Hello world!")
publish(q, title = "Second message", message = "Hello again!")
list_messages(q)
#>   id          title status
#> 1  1  First message  READY
#> 2  2 Second message  READY

A liteq message has a title, which is a string scalar, and the message body itself is a sting scalar as well. To use more complex data types in messages, you need to serialize them using the serialize() function (set ascii to TRUE!), or convert them to JSON with the jsonlite package.

Two functions are available to consume a message from a queue. try_consume() returns immediately, either with a message (liteq_message object), or NULL if the queue is empty. The consume() function blocks if the queue is empty, and waits until a message appears in it.

msg <- try_consume(q)
msg
#> liteq message from queue 'jobs':
#>   First message (12 B)

The title and the message body are available as fields of the message object:

msg$title
#> [1] "First message"
msg$message
#> [1] "Hello world!"

When a consumer is done processing a message it must call ack() on the message object, to notify the queue that it is safe to remove the message. If the consumer fails to process a message, it can call nack() (negative ackowledgement) on the message object. Then the status of the message will be set to "FAILED". Failed messages can be removed from the queue, or put back in the queue again, depending on the application.

ack(msg)
list_messages(q)
#>   id          title status
#> 1  2 Second message  READY
msg2 <- try_consume(q)
nack(msg2)
list_messages(q)
#>   id          title status
#> 1  2 Second message FAILED

The queue is empty now, so try_consume() returns NULL:

try_consume(q)
#> NULL

Crashed workers

If a worker crashes without calling either ack() or nack() on a message, then this messages will be put back in the queue the next time a message is requested from the queue.

To make this possible, each delivered message keeps an open connection to a lock file, and crashed workers are found by the absense of this open connection. In R basically means that the worker is considered as crashed if the R process has no reference to the message object.

Note, that this also means that having many workers at the same time means that it is possible to reach the maximum number of open connections by R or the operating system.

License

MIT © Gábor Csárdi

Functions in liteq

Name Description
db_consume Consume a message from a message queue
db_create_queue Create a queue
create_queue Create a queue in a database
db_ack Positive or negative ackowledgement
db_try_consume Try to consume a message from the queue
default_db The name of the default database
nack Report that the work on a message has failed
publish Publish a message in a queue
ack Acknowledge that the work on a message has finished successfully
consume Consume a message from a queue
remove_failed_messages Remove failed messages from the queue
requeue_failed_messages Requeue failed messages
liteq Lightweight Portable Message Queue Using 'SQLite'
make_message Make a message object
try_consume Consume a message if there is one available
delete_queue Delete a queue
ensure_db Ensure that the DB exists and has the right columns
ensure_queue Make sure that a queue exists
list_failed_messages List failed messages in a queue
list_messages List all messages in a queue
list_queues List all queues in a database
No Results!

Last month downloads

Details

License MIT + file LICENSE
LazyData true
URL https://github.com/r-lib/liteq#readme
BugReports https://github.com/r-lib/liteq/issues
RoxygenNote 6.0.1
Encoding UTF-8
NeedsCompilation no
Packaged 2017-10-20 17:39:48 UTC; gaborcsardi
Repository CRAN
Date/Publication 2017-10-20 19:47:15 UTC

Include our badge in your README

[![Rdoc](http://www.rdocumentation.org/badges/version/liteq)](http://www.rdocumentation.org/packages/liteq)