pbdZMQ (version 0.2-3)

New Poll Functions: New Poll Functions


New poll functions


zmq.poll2(socket, type, timeout = -1L, MC = .pbd_env$ZMQ.MC)
zmq.poll2.interrupt(socket, type, timeout = -1L, MC = .pbd_env$ZMQ.MC)
zmq.poll2.get.revents(index = 1L, poller)


a vector of ZMQ sockets
a vector of socket types corresponding to socket argument
timeout for poll, see ZeroMQ manual for details
a message control, see ZMQ.MC() for details
an index of ZMQ poll items to obtain revents
a pointer of ZMQ poller


zmq.poll2() returns a ZMQ code, an errno, and a pollitem pointer. No error/warning/interrupt in this R function, but some error/warning/interrupt may catch by the C function zmq_poll(). See ZeroMQ manual for details.zmq.poll.get.revents.new() returns the revent type.


zmq.poll2() initials ZMQ poll items given ZMQ socket's and ZMQ poll type's. Both socket and type are in vectors of the same length, while socket contains socket pointers and type contains types of poll. See ZMQ.PO() for the possible values of type. ZMQ defines several poll types and utilize them to poll multiple sockets.

zmq.poll2.interrupt() call zmq.poll2() and raise an interrupt signal if ret$pollret[1] == -1 and ret$pollret[2] == 4.

zmq.poll2.get.revents() obtains revent types from ZMQ poll item by the input index..


ZeroMQ/4.1.0 API Reference: http://api.zeromq.org/4-1:_start

Programming with Big Data in R Website: http://r-pbd.org/

See Also

zmq.recv(), zmq.send().


Run this code
## Not run: 
# ### Multiple socket reader as in the ZeroMQ guide.
# # SHELL> Rscript wuserver.r &
# # SHELL> Rscript taskvent.r &
# # SHELL> Rscript mspoller2.r
# # SHELL> rm weather.ipc
# library(pbdZMQ, quietly = TRUE)
# ### Initial.
# context <- zmq.ctx.new()
# receiver <- zmq.socket(context, .pbd_env$ZMQ.ST$PULL)
# zmq.connect(receiver, "tcp://localhost:5557")
# subscriber <- zmq.socket(context, .pbd_env$ZMQ.ST$SUB)
# zmq.connect(subscriber, "tcp://localhost:5556")
# zmq.setsockopt(subscriber, .pbd_env$ZMQ.SO$SUBSCRIBE, "20993")
# ### Process messages from both sockets.
# cat("Press Ctrl+C or Esc to stop mspoller.\n")
# i.rec <- 0
# i.sub <- 0
# while(TRUE){
#   ### Set poller.
#   poller <- zmq.poll2(c(receiver, subscriber),
#                       c(.pbd_env$ZMQ.PO$POLLIN, .pbd_env$ZMQ.PO$POLLIN))
#   ### Check receiver.
#   if(bitwAnd(zmq.poll2.get.revents(1, poller),
#              .pbd_env$ZMQ.PO$POLLIN)){
#     ret <- zmq.recv(receiver)
#     if(ret$len != -1){
#       cat("task ventilator:", ret$buf, "at", i.rec, "\n")
#       i.rec <- i.rec + 1
#     }
#   }
#   ### Check subscriber.
#   if(bitwAnd(zmq.poll2.get.revents(2, poller),
#              .pbd_env$ZMQ.PO$POLLIN)){
#     ret <- zmq.recv(subscriber)
#     if(ret$len != -1){
#       cat("weather update:", ret$buf, "at", i.sub, "\n")
#       i.sub <- i.sub + 1
#     }
#   }
#   if(i.rec >= 5 & i.sub >= 5){
#     break
#   }
#   Sys.sleep(runif(1, 0.5, 1))
# }
# ### Finish.
# zmq.close(receiver)
# zmq.close(subscriber)
# zmq.ctx.destroy(context)
# ## End(Not run)

Run the code above in your browser using DataLab