# NOT RUN {
# ----------------------Basic Usage ----------------------
# Define a path to your queue.
queue <- qs_queue(path = tempfile())
# Reset
queue$reset()
# Check if the queue is corrupted.
queue$validate()
# You have not pushed any messages yet.
# Let's say two parallel processes (A and B) are sharing this queue.
# Process A sends Process B some messages.
# You can only send character vectors.
queue$list()
# Start push
# Push a normal message
queue$push(value = 'Do this', message = 'hello')
# Push a quo
v <- 16
queue$push(value = rlang::quo({
sqrt(!!v)
}), message = 'eval')
# Push a large object
queue$push(value = rnorm(100000), message = 'sum')
# Push only message
queue$push(value = NULL, message = 'stop')
# Check queued messages.
# The `time` is a formatted character string from `Sys.time()`
# indicating when the message was pushed. `key` is unique key
# generated from `time`, `value` and queue internal `ID`
queue$list()
# Number of messages in the queue.
queue$count
# Number of messages that were ever queued.
queue$total
# Return and remove the first messages that were added.
queue$pop(2)
# Number of messages in the queue.
queue$count
# List what's left
queue$list()
val1 <- queue$pop()
val2 <- queue$pop()
# Destroy the queue's files altogether.
queue$destroy()
# }
# NOT RUN {
# Once destroyed, validate will raise error
queue$validate()
# }
# NOT RUN {
# ----------------------Cross-Process Usage ----------------------
# Cross session example
queue <- text_queue()
# In another process
future::future({
process_pid = Sys.getpid()
queue$push(process_pid)
}) -> f
# In current process, get pid
# wait 0.2 seconds, making sure the queue has at least an item
Sys.sleep(0.2)
message = queue$pop()
message[[1]]
# ----------------------Shiny Example ----------------------
library(shiny)
library(promises)
library(dipsaus)
ui <- fluidPage(
fluidRow(
column(
12,
actionButtonStyled('do', 'Launch Process', type = 'primary'),
hr(),
textOutput('text')
)
)
)
server <- function(input, output, session) {
make_forked_clusters()
env = environment()
progress = NULL
queue <- rds_queue()
timer = reactiveTimer(50)
local_data = reactiveValues(text = '')
observe({
timer()
message = queue$pop()
if(length(message)){
instruction = message[[1]]$value
eval_dirty(instruction, env = env)
}
})
output$text <- renderText({
print(local_data$text)
return(local_data$text)
})
observeEvent(input$do, {
updateActionButtonStyled(session, 'do', disabled = TRUE)
if(!is.null(progress)){
progress$close()
}
progress <<- progress2('Analysis [A]', max = 10)
future::future({
lapply(1:10, function(ii){
queue$push(rlang::quo(
progress$inc(!!sprintf('Processing %d', ii))
))
Sys.sleep(0.2)
})
return(Sys.getpid())
}) %...>% (function(v){
queue$push(rlang::quo({
progress$close()
updateActionButtonStyled(session, 'do', disabled = FALSE)
}))
queue$push(rlang::quo({
local_data$text = !!sprintf('Finished in process (PID): %s', v)
}))
})
NULL
}, ignoreInit = TRUE)
session$onSessionEnded(function(){
queue$destroy()
})
}
if( interactive() ){
shinyApp(ui, server)
}
# }
Run the code above in your browser using DataLab