BiocParallel (version 1.4.0)

bpiterate: Parallel iteration over an indeterminate number of data chunks

Description

bpiterate iterates over an indeterminate number of data chunks (e.g., records in a file). Each chunk is processed by parallel workers in an asynchronous fashion; as each worker finishes it receives a new chunk. Data are traversed a single time.

Usage

bpiterate(ITER, FUN, ..., BPPARAM=bpparam())
"bpiterate"(ITER, FUN, ..., BPPARAM=bpparam())
"bpiterate"(ITER, FUN, ..., BPPARAM=bpparam())

Arguments

ITER
A function with no arguments that returns an object to process, generally a chunk of data from a file. When no objects are left (i.e., end of file) it should return NULL and continue to return NULL regardless of the number of times it is invoked after reaching the end of file. This function is run on the master.
FUN
A function to process the object returned by ITER; run on parallel workers separate from the master. When BPPARAM is a MulticoreParam, FUN is `decorated` with additional arguments and therefore must have ... in the signature.
BPPARAM
An optional BiocParallelParam instance determining the parallel back-end to be used during evaluation, or a list of BiocParallelParam instances, to be applied in sequence for nested calls to bpiterate.
...
Arguments to other methods, specifically named arguments for FUN, or REDUCE or init.

  • REDUCE: Optional function that combines (reduces) output from FUN. As each worker returns, the data are combined with the REDUCE function. REDUCE takes 2 arguments; one is the current result and the other is the output of FUN from a worker that just finished.

  • init: Optional initial value for REDUCE; must be of the same type as the object returned from FUN. When supplied, reduce.in.order is set to TRUE.
  • reduce.in.order: Logical. When TRUE, REDUCE is applied to the results from the workers in the same order the tasks were sent out.
  • Value

    A list the same length as the number of chunks in ITER(). When REDUCE is used list length is 1.

    Details

    Supported for SnowParam and MulticorParam.

    bpiterate iterates through an unknown number of data chunks, dispatching chunks to parallel workers as they become available. In contrast, other bp*apply functions such as bplapply or bpmapply require the number of data chunks to be specified ahead of time. This quality makes bpiterate useful for iterating through files of unknown length.

    ITER serves up chunks of data until the end of the file is reached at which point it returns NULL. Note that ITER should continue to return NULL reguardless of the number of times it is invoked after reaching the end of the file. FUN is applied to each object (data chunk) returned by ITER.

    See Also

    Examples

    Run this code
    ## Not run: 
    # if (all(require(Rsamtools) && 
    #         require(RNAseqData.HNRNPC.bam.chr14) &&
    #         require(GenomicAlignments) &&
    #         require(ShortRead))) { 
    # 
    #   ## ----------------------------------------------------------------------
    #   ## Iterate through a BAM file
    #   ## ----------------------------------------------------------------------
    #  
    #   ## Select a single file and set 'yieldSize' in the BamFile object.
    #   fl <- RNAseqData.HNRNPC.bam.chr14_BAMFILES[[1]]
    #   bf <- BamFile(fl, yieldSize = 300000) 
    #  
    #   ## bamIterator() is initialized with a BAM file and returns a function.
    #   ## The return function requires no arguments and iterates through the
    #   ## file returning data chunks the size of yieldSize.
    #   bamIterator <- function(bf) {
    #       done <- FALSE
    #       if (!isOpen( bf))
    #           open(bf)
    #  
    #       function() {
    #           if (done)
    #               return(NULL)
    #           yld <- readGAlignments(bf) 
    #           if (length(yld) == 0L) {
    #               close(bf)
    #               done <<- TRUE
    #               NULL
    #           } else yld
    #       }
    #   }
    #  
    #   ## FUN counts reads in a region of interest.
    #   roi <- GRanges("chr14", IRanges(seq(19e6, 107e6, by = 10e6), width = 10e6))
    #   counter <- function(reads, roi, ...) {
    #       countOverlaps(query = roi, subject = reads)
    #   }
    # 
    #   ## Initialize the iterator.
    #   ITER <- bamIterator(bf)
    # 
    #   ## The number of chunks returned by ITER() determines the result length.
    #   bpparam <- MulticoreParam(workers = 3) 
    #   bpiterate(ITER, counter, roi = roi, BPPARAM = bpparam)
    # 
    #   ## Re-initialize the iterator and combine on the fly with REDUCE:
    #   ITER <- bamIterator(bf)
    #   bpparam <- MulticoreParam(workers = 3) 
    #   bpiterate(ITER, counter, REDUCE = sum, roi = roi, BPPARAM = bpparam)
    # 
    #   ## ----------------------------------------------------------------------
    #   ## Iterate through a FASTA file
    #   ## ----------------------------------------------------------------------
    #  
    #   ## Set data chunk size with 'n' in the FastqStreamer object.
    #   sp <- SolexaPath(system.file('extdata', package = 'ShortRead'))
    #   fl <- file.path(analysisPath(sp), "s_1_sequence.txt")
    #  
    #   ## Create an iterator that returns data chunks the size of 'n'.
    #   fastqIterator <- function(fqs) {
    #       done <- FALSE
    #       if (!isOpen(fqs))
    #           open(fqs)
    #  
    #       function() {
    #           if (done)
    #               return(NULL)
    #           yld <- yield(fqs) 
    #           if (length(yld) == 0L) {
    #               close(fqs)
    #               done <<- TRUE
    #               NULL
    #           } else yld
    #       }
    #   }
    #  
    #   ## The process function summarizes the number of times each sequence occurs.
    #   summary <- function(reads, ...) {
    #        ShortRead::tables(reads, n = 0)$distribution
    #   }
    # 
    #   ## Create a param.
    #   bpparam <- SnowParam(workers = 2) 
    # 
    #   ## Initialize the streamer and iterator.
    #   fqs <- FastqStreamer(fl, n = 100)
    #   ITER <- fastqIterator(fqs)
    #   bpiterate(ITER, summary, BPPARAM = bpparam)
    #  
    #   ## Results from the workers are combined on the fly when REDUCE is used.
    #   ## Collapsing the data in this way can substantially reduce memory 
    #   ## requirements.
    #   fqs <- FastqStreamer(fl, n = 100)
    #   ITER <- fastqIterator(fqs)
    #   bpiterate(ITER, summary, REDUCE = merge, all = TRUE, BPPARAM = bpparam)
    # 
    #   }
    # ## End(Not run)
    

    Run the code above in your browser using DataCamp Workspace