R6 abstract class to build other subclasses
which launch and manage workers.
workersData frame of worker information.
nameName of the launcher.
seconds_intervalSee crew_launcher().
seconds_launchSee crew_launcher().
seconds_idleSee crew_launcher().
seconds_wallSee crew_launcher().
seconds_exitSee crew_launcher().
tasks_maxSee crew_launcher().
tasks_timersSee crew_launcher().
reset_globalsSee crew_launcher().
reset_packagesSee crew_launcher().
reset_optionsSee crew_launcher().
garbage_collectionSee crew_launcher().
launch_maxSee crew_launcher().
tlsSee crew_launcher().
untilNumeric of length 1, time point when throttled unlocks.
new()Launcher constructor.
crew_class_launcher$new(
name = NULL,
seconds_interval = NULL,
seconds_launch = NULL,
seconds_idle = NULL,
seconds_wall = NULL,
seconds_exit = NULL,
tasks_max = NULL,
tasks_timers = NULL,
reset_globals = NULL,
reset_packages = NULL,
reset_options = NULL,
garbage_collection = NULL,
launch_max = NULL,
tls = NULL
)nameSee crew_launcher().
seconds_intervalSee crew_launcher().
seconds_launchSee crew_launcher().
seconds_idleSee crew_launcher().
seconds_wallSee crew_launcher().
seconds_exitSee crew_launcher().
tasks_maxSee crew_launcher().
tasks_timersSee crew_launcher().
reset_globalsSee crew_launcher().
reset_packagesSee crew_launcher().
reset_optionsSee crew_launcher().
garbage_collectionSee crew_launcher().
launch_maxSee crew_launcher().
tlsSee crew_launcher()
An R6 object with the launcher.
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(workers = client$workers)
launcher$launch(index = 1L)
m <- mirai::mirai("result", .compute = client$name)
Sys.sleep(0.25)
m$data
client$terminate()
}
validate()Validate the launcher.
crew_class_launcher$validate()NULL (invisibly).
settings()List of arguments for mirai::daemon().
crew_class_launcher$settings(socket)socketCharacter of length 1, websocket address of the worker to launch.
List of arguments for mirai::daemon().
call()Create a call to crew_worker() to
help create custom launchers.
crew_class_launcher$call(socket, launcher, worker, instance)socketSocket where the worker will receive tasks.
launcherCharacter of length 1, name of the launcher.
workerPositive integer of length 1, index of the worker. This worker index remains the same even when the current instance of the worker exits and a new instance launches.
instanceCharacter of length 1 to uniquely identify the instance of the worker.
Character of length 1 with a call to crew_worker().
launcher <- crew_launcher_local()
launcher$call(
socket = "ws://127.0.0.1:5000/3/cba033e58",
launcher = "launcher_a",
worker = 3L,
instance = "cba033e58"
)
socketsFor testing purposes only.
Creates the workers data frame. Meant to be called once at the beginning of the launcher life cycle, after the client has started.
NULL (invisibly).
NULL if the launcher is not started. Otherwise, a tibble
with one row per crew worker and the following columns:
worker: integer index of the worker.
launches: number of times the worker was launched. Each launch
occurs at a different websocket because the token at the end of the
URL is rotated before each new launch.
assigned: cumulative number of tasks assigned, reported by
mirai::daemons() and summed over all
completed instances of the worker. Does not reflect the activity
of the currently running instance of the worker.
complete: cumulative number of tasks completed, reported by
mirai::daemons() and summed over all
completed instances of the worker. Does not reflect the activity
of the currently running instance of the worker.
socket: current websocket URL of the worker.
done()Get done workers.
crew_class_launcher$done(daemons = NULL)daemonsmirai daemons matrix. For testing only. Users
should not set this.
A worker is "done" if it is launched and inactive.
A worker is "launched" if launch() was called
and the worker websocket has not been rotated since.
If a worker is currently online, then it is not inactive.
If a worker is not currently online, then it is inactive
if and only if (1) either it connected to the current
websocket at some point in the past,
or (2) seconds_launch seconds elapsed since launch.
Integer index of inactive workers.
rotate()crew_class_launcher$rotate(index)indexInteger index of a worker.
Rotate a websocket.
NULL (invisibly).
tally()Update the cumulative assigned and complete statistics.
Used to detect backlogged workers with more assigned than complete tasks. If terminated, these workers need to be relaunched until the backlog of assigned tasks is complete.
crew_class_launcher$tally(daemons = NULL)daemonsmirai daemons matrix. For testing only. Users
should not set this.
NULL (invisibly).
unlaunched()Get workers available for launch.
crew_class_launcher$unlaunched(n = Inf)nMaximum number of worker indexes to return.
Integer index of workers available for launch.
backlogged()List non-launched backlogged workers.
crew_class_launcher$backlogged()Integer vector of worker indexes.
resolved()List non-launched non-backlogged workers.
crew_class_launcher$resolved()nMaximum number of worker indexes to return.
Integer vector of worker indexes.
launch()Launch a worker.
crew_class_launcher$launch(index)indexPositive integer of length 1, index of the worker to launch.
NULL (invisibly).
throttle()Throttle repeated calls.
crew_class_launcher$throttle()TRUE to throttle, FALSE to continue.
scale()Auto-scale workers out to meet the demand of tasks.
crew_class_launcher$scale(demand, throttle = FALSE)demandNumber of unresolved tasks.
throttleLogical of length 1, whether to delay auto-scaling
until the next auto-scaling request at least
self$client$seconds_interval seconds from the original request.
The idea is similar to shiny::throttle() except that crew does not
accumulate a backlog of requests. The technique improves robustness
and efficiency.
NULL (invisibly)
terminate()Terminate one or more workers.
crew_class_launcher$terminate(index = NULL)indexInteger vector of the indexes of the workers
to terminate. If NULL, all current workers are terminated.
NULL (invisibly).
terminate_worker()Abstract method.
crew_class_launcher$terminate_worker(handle)handleA handle object previously
returned by launch_worker() which allows the termination
of the worker.
Does not actually terminate a worker. This method is a placeholder, and its presence allows manual worker termination to be optional.
NULL (invisibly).
Other class:
crew_class_client,
crew_class_controller_group,
crew_class_controller,
crew_class_schedule,
crew_class_tls
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(workers = client$workers)
launcher$launch(index = 1L)
m <- mirai::mirai("result", .compute = client$name)
Sys.sleep(0.25)
m$data
client$terminate()
}
## ------------------------------------------------
## Method `crew_class_launcher$new`
## ------------------------------------------------
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(workers = client$workers)
launcher$launch(index = 1L)
m <- mirai::mirai("result", .compute = client$name)
Sys.sleep(0.25)
m$data
client$terminate()
}
## ------------------------------------------------
## Method `crew_class_launcher$call`
## ------------------------------------------------
launcher <- crew_launcher_local()
launcher$call(
socket = "ws://127.0.0.1:5000/3/cba033e58",
launcher = "launcher_a",
worker = 3L,
instance = "cba033e58"
)
Run the code above in your browser using DataLab