Learn R Programming

dipsaus (version 0.0.3)

queue: Create R object queue.

Description

Provides five types of queue that fit in different use cases.

Usage

session_queue(map = fastmap::fastmap())

rds_queue(path = tempfile())

text_queue(path = tempfile())

qs_queue(path = tempfile())

redis_queue(name = rand_string())

Arguments

map

a fastmap::fastmap() list

path

directory path where queue data should be stored

name

character, queue name. If queue name are the same, the data will be shared.

Value

An R6 instance that inherits AbstractQueue

Details

There are five types of queue implemented. They all inherit class AbstractQueue. There are several differences in use case scenarios and they backend implementations.

session_queue

A session queue takes a fastmap object. All objects are stored in current R session. This means you cannot access the queue from other process nor parent process. The goal of this queue is to share the data across different environments and to store global variables, as long as they share the same map object. If you are looking for queues that can be shared by different processes, check the rest queue types.

rds_queue

A 'RDS' queue uses file system to store values. The values are stored separately in '.rds' files. Compared to session queues, 'RDS' queue can be shared across different R process. However, because each value is stored as a file, cleaning a queue would be slow, hence it's recommended to store large files in rds_queue. If the value is not large in RAM, text_queue and redis_queue are recommended.

qs_queue

A 'qs' queue uses package 'qs' as backend. This queue is very similar to rds_queue, but is especially designed for large values. For example, pushing 1GB data to qs_queue will be 100 times faster than using rds_queue, and text_queue will almost fail. However, compared to rds_queue the stored data cannot be normally read by R as they are compressed binary files. And qs_queue is heavier than text_queue.

text_queue

A 'text' queue uses file system to store values. Similar to rds_queue, it can be stored across multiple processes as long as the queues share the same file directory. Compared to rds_queue, text_queue serialize values as strings and stores them as a text table. It's much lighter but limited. For example, all other queue types can store environment and functions. Though text_queue can also store complicated structures, The speed is much slower (could freeze the whole process). Therefore, it's highly recommended to use redis_queue, qs_queue, and rds_queue if speed is not the major concern.

redis_queue

A 'Redis' queue uses free open source software `Redis` and R package 'RcppRedis' as backend. Compared to session queue, 'Redis' queue can be shared across sessions. Compared to 'text' and 'rds' queues, 'Redis' queue stores data in memory, meaning a potential of significant speed ups. To use redis_queue, you need to install `Redis` on your computer.

  • On Mac: use `brew install redis` to install and `brew services start redis` to start the service

  • On Linux: use `sudo apt-get install redis-server` to install and `sudo systemctl enable redis-server.service` to start the service

  • On Windows: Download from https://github.com/dmajkic/redis/downloads and double click 'redis-server.exe'

Examples

Run this code
# NOT RUN {
# ----------------------Basic Usage ----------------------

# Define a path to your queue.
queue <- qs_queue(path = tempfile())

# Reset
queue$reset()

# Check if the queue is corrupted.
queue$validate()

# You have not pushed any messages yet.
# Let's say two parallel processes (A and B) are sharing this queue.
# Process A sends Process B some messages.
# You can only send character vectors.
queue$list()

# Start push
# Push a normal message
queue$push(value = 'Do this', message = 'hello')

# Push a quo
v <- 16
queue$push(value = rlang::quo({
  sqrt(!!v)
}), message = 'eval')

# Push a large object
queue$push(value = rnorm(100000), message = 'sum')

# Push only message
queue$push(value = NULL, message = 'stop')

# Check queued messages.
# The `time` is a formatted character string from `Sys.time()`
# indicating when the message was pushed. `key` is unique key
# generated from `time`, `value` and queue internal `ID`
queue$list()

# Number of messages in the queue.
queue$count

# Number of messages that were ever queued.
queue$total

# Return and remove the first messages that were added.
queue$pop(2)

# Number of messages in the queue.
queue$count

# List what's left
queue$list()

val1 <- queue$pop()
val2 <- queue$pop()

# Destroy the queue's files altogether.
queue$destroy()

# }
# NOT RUN {
  # Once destroyed, validate will raise error
  queue$validate()
# }
# NOT RUN {
# ----------------------Cross-Process Usage ----------------------

# Cross session example

queue <- text_queue()

# In another process
future::future({
  process_pid = Sys.getpid()
  queue$push(process_pid)
}) -> f

# In current process, get pid
# wait 0.2 seconds, making sure the queue has at least an item
Sys.sleep(0.2)
message = queue$pop()
message[[1]]

# ----------------------Shiny Example ----------------------
library(shiny)
library(promises)
library(dipsaus)

ui <- fluidPage(
  fluidRow(
    column(
      12,
      actionButtonStyled('do', 'Launch Process', type = 'primary'),
      hr(),
      textOutput('text')
    )
  )
)
server <- function(input, output, session) {
  make_forked_clusters()
  env = environment()
  progress = NULL
  queue <- rds_queue()
  timer = reactiveTimer(50)
  local_data = reactiveValues(text = '')
  observe({
    timer()
    message = queue$pop()
    if(length(message)){
      instruction = message[[1]]$value
      eval_dirty(instruction, env = env)
    }
  })

  output$text <- renderText({
    print(local_data$text)
    return(local_data$text)
  })

  observeEvent(input$do, {
    updateActionButtonStyled(session, 'do', disabled = TRUE)
    if(!is.null(progress)){
      progress$close()
    }
    progress <<- progress2('Analysis [A]', max = 10)

    future::future({
      lapply(1:10, function(ii){
        queue$push(rlang::quo(
          progress$inc(!!sprintf('Processing %d', ii))
        ))
        Sys.sleep(0.2)
      })
      return(Sys.getpid())
    }) %...>% (function(v){
      queue$push(rlang::quo({
        progress$close()
        updateActionButtonStyled(session, 'do', disabled = FALSE)
      }))
      queue$push(rlang::quo({
        local_data$text = !!sprintf('Finished in process (PID): %s', v)
      }))
    })
    NULL
  }, ignoreInit = TRUE)

  session$onSessionEnded(function(){
    queue$destroy()
  })
}

if( interactive() ){
  shinyApp(ui, server)
}

# }

Run the code above in your browser using DataLab