Learn R Programming

ipc (version 0.1.4)

Queue: A Class containing a producer and consumer

Description

Creates a Queue object for inter-process communication. Its members producer and consumer are the main entry points for sending and receiving messages respectively.

Usage

queue(
  source = defaultSource()$new(),
  producer = Producer$new(source),
  consumer = Consumer$new(source)
)

Arguments

source

The source for reading and writing the queue

producer

The producer for the source

consumer

The consumer of the source

Public fields

producer

A Producer object

consumer

a Consumer object.

Methods


Method new()

Create a Queue object

Usage

Queue$new(source, prod, cons)

Arguments

source

The source to use for communication.

prod

A Producer object.

cons

A Consumer object.


Method destroy()

clean up object after use.

Usage

Queue$destroy()


Method clone()

The objects of this class are cloneable with this method.

Usage

Queue$clone(deep = FALSE)

Arguments

deep

Whether to make a deep clone.

Details

This function creates a queue object for communication between different R processes, including forks of the same process. By default, it uses txtq backage as its backend. Technically, the information is sent through temporary files, created in a new directory inside the session-specific temporary folder (see tempfile). This requires that the new directory is writeable, this is normally the case but if Sys.umask forbids writing, the communication fails with an error.

Examples

Run this code
if (FALSE) {
library(parallel)
library(future)
library(promises)
plan(multisession)

q <- queue()

# communicate from main session to child
fut <- future({
  for(i in 1:1000){
    Sys.sleep(.1)
    q$consumer$consume()
  }
})

q$producer$fireEval(stop("Stop that child"))
cat(try(value(fut)))

# Communicate from child to main session
j <- 0
fut <- future({
  for(i in 1:10){
    Sys.sleep(.2)

    # set j in the main thread substituting i into the expression
    q$producer$fireEval(j <- i, env=list(i=i))
  }
})

while(j < 10){
 q$consumer$consume() # collect and execute assignments
 cat("j = ", j, "\n")
 Sys.sleep(.1)
}

fut <- future({
  for(i in 1:10){
    Sys.sleep(.2)

    # set j in the main thread substituting i into the expression
    q$producer$fireEval(print(i), env=list(i=i))
  }
})

q$consumer$start() # execute `comsume` at regular intervals

# clean up
q$destroy()

}

Run the code above in your browser using DataLab