Simple example

The below code is running glm with the palmerpenguins dataset.

compute <- function(n) {

  library(palmerpenguins)
  
  # Our dataset
  x <- as.data.frame(penguins[c(4, 1)])
  
  ind <- sample(344, 344, replace = TRUE)
  result1 <-
    glm(x[ind, 2] ~ x[ind, 1], family = binomial(logit))
  coefficients(result1)
}

On the R console we now simply can run

compute(1)
(Intercept)   x[ind, 1] 
  13.191595   -0.746895 

If we now want to run the same compute function say a 100 times, we can use the sapply function

res<-sapply(1:100,compute)

Let’s run this function a 10, 100 and 1000 times and measure the compute time

library(microbenchmark)
microbenchmark(
  sapply(1:10,compute),
  sapply(1:100,compute),
  sapply(1:1000,compute),
  times=10)
Unit: milliseconds
                    expr       min         lq       mean     median         uq
   sapply(1:10, compute)   26.2505   26.34203   27.85132   26.86744   29.82167
  sapply(1:100, compute)  275.4522  277.92133  281.41817  281.80109  283.90646
 sapply(1:1000, compute) 2727.4093 2804.23806 2826.31019 2820.88694 2851.78349
        max neval
   32.20464    10
  289.00417    10
 2925.19585    10

As we can see, the compute time increases fairly linearly with the number of calculations. With 1000 computations we reach almost 3 seconds.

Now, let’s do the same with clustermq and the Q function

First, let’s define the appropriate template

options(
    clustermq.scheduler = "slurm",
    clustermq.template = "slurm.tmpl"
)
library(clustermq) 
system.time(res<-Q(compute, n=1:1000, n_jobs=1))
Submitting 1 worker jobs (ID: cmq7519) ...
Running 1,000 calculations (0 objs/0 Mb common; 1 calls/chunk) ...
Master: [6.0s 8.7% CPU]; Worker: [avg 90.9% CPU, max 307.6 Mb]
   user  system elapsed 
  0.575   0.040   6.049 

The Q function shows a progress bar and at the end will output usage stats for the Master (where the Q function was spawned from. i.e. from the R console in RStudio) and the Worker for both CPU and memory utilisation.

We see that the same calculation that took 3 seconds with sapply now takes longer. This is caused by the overhead caused by * submitting a job to the queue (it takes time from submit to actual start of the job) * communication between the Master and the Worker processes * Time taken on Master process to reduce all the partial results into one final dataset

Influence of chunk_size

While we cannot optimise much as far as the time between submit and actual start of the job is concerned, we can otimise the communication between the Master and the Worker. As we see from our example, clustermq only runs 1 call/chunk, which means the Master sends 1 number to the Worker, gets him to compute the result, send it back and then the Master send the next number. Sending very small amounts of data over the network is always very costly and should be avoided (for small amounts of data, transfer time is limited by network latency rather than actual network bandwidth). We can optimise by adding more function calls to a chunk. Let’s do it straight in microbenchmark

library(clustermq) 
library(microbenchmark)
microbenchmark(
  res<-Q(compute, n=1:1000, verbose=FALSE, n_jobs=1),
  res<-Q(compute, n=1:1000, verbose=FALSE, n_jobs=1, chunk_size=10),
  res<-Q(compute, n=1:1000, verbose=FALSE, n_jobs=1, chunk_size=100),
  times=10
)
Unit: seconds
                                                                         expr
                   res <- Q(compute, n = 1:1000, verbose = FALSE, n_jobs = 1)
  res <- Q(compute, n = 1:1000, verbose = FALSE, n_jobs = 1, chunk_size = 10)
 res <- Q(compute, n = 1:1000, verbose = FALSE, n_jobs = 1, chunk_size = 100)
      min       lq     mean   median       uq      max neval
 5.386837 6.002397 5.978394 6.030717 6.062987 6.132101    10
 4.687595 4.984848 5.191266 5.015985 5.036211 7.022424    10
 4.190342 4.599913 4.976782 4.906269 4.988659 7.057703    10

What you will see is that the larger the chunk_size, the faster the code execution. This is also evident by the increased percentual CPU utilisation of the Worker (not shown in the rendered doc). So with chunk_size=100 we are down to 5 seconds which is good enough for now.

Scaling up

We now can increase the number of jobs used for our computation and see what happens

library(clustermq) 
library(microbenchmark)
microbenchmark(
  res<-Q(compute, n=1:1000, verbose=FALSE, n_jobs=1, chunk_size=10),
  res<-Q(compute, n=1:1000, verbose=FALSE, n_jobs=2, chunk_size=10),
  res<-Q(compute, n=1:1000, verbose=FALSE, n_jobs=4, chunk_size=10),
  res<-Q(compute, n=1:1000, verbose=FALSE, n_jobs=8, chunk_size=10),
  times=10
)
Unit: seconds
                                                                        expr
 res <- Q(compute, n = 1:1000, verbose = FALSE, n_jobs = 1, chunk_size = 10)
 res <- Q(compute, n = 1:1000, verbose = FALSE, n_jobs = 2, chunk_size = 10)
 res <- Q(compute, n = 1:1000, verbose = FALSE, n_jobs = 4, chunk_size = 10)
 res <- Q(compute, n = 1:1000, verbose = FALSE, n_jobs = 8, chunk_size = 10)
      min       lq     mean   median       uq      max neval
 4.414095 4.467724 4.903429 5.023414 5.174582 5.465580    10
 2.983095 3.003283 3.518999 3.599471 3.773929 4.165966    10
 2.119886 2.281146 2.792152 2.828585 3.271375 3.301467    10
 1.967427 2.719131 2.812413 2.750438 2.973397 3.747350    10

We now can see that the more jobs we use the faster the job execution gets - we however also see that using 8 jobs does not make the code run 8 times as fast. This again is caused by the overhead we described in the previous section that cannot be parallelized.

Let’s try and get the jobs more work to do by scaling up further, i.e. use n=1:10000 and chunk_size=100.

library(clustermq) 
library(microbenchmark)
microbenchmark(
  res<-Q(compute, n=1:10000, verbose=FALSE, n_jobs=1, chunk_size=100),
  res<-Q(compute, n=1:10000, verbose=FALSE, n_jobs=2, chunk_size=100),
  res<-Q(compute, n=1:10000, verbose=FALSE, n_jobs=4, chunk_size=100),
  res<-Q(compute, n=1:10000, verbose=FALSE, n_jobs=8, chunk_size=100),
  times=10
)
Unit: seconds
                                                                          expr
 res <- Q(compute, n = 1:10000, verbose = FALSE, n_jobs = 1, chunk_size = 100)
 res <- Q(compute, n = 1:10000, verbose = FALSE, n_jobs = 2, chunk_size = 100)
 res <- Q(compute, n = 1:10000, verbose = FALSE, n_jobs = 4, chunk_size = 100)
 res <- Q(compute, n = 1:10000, verbose = FALSE, n_jobs = 8, chunk_size = 100)
       min        lq      mean    median        uq       max neval
 29.647617 29.848939 30.235241 30.069296 30.153109 32.349257    10
 16.012912 16.095444 16.840333 16.229791 18.390670 18.487193    10
  8.758291  8.862055  9.251549  8.963853  9.018703 11.397048    10
  5.009530  5.549281  6.064703  5.709003  5.898295  8.019442    10

This is pretty good - 8 jobs provide 5 times faster run time compared to 1 job.

Using a foreach loop

Below you will find the same function, now using foreach instead of the Q function.

computeforeach <- function(samples, tasks) {
  library(foreach)
  library(palmerpenguins)
  
  # Register parallel backend to foreach
  register_dopar_cmq(
    n_jobs = tasks,
    log_worker = FALSE,
    verbose=FALSE, 
    chunk_size=100
  )
  
  # Our dataset
  x <- as.data.frame(penguins[c(4, 1)])
  
  # Number of samples to simulate
  samples <- samples
  
  # Main loop
  foreach(i = 1:samples, .combine = rbind) %dopar% {
    ind <- sample(344, 344, replace = TRUE)
    result1 <-
      glm(x[ind, 2] ~ x[ind, 1], family = binomial(logit))
    coefficients(result1)
  }
}
library(clustermq) 
library(microbenchmark)
microbenchmark(
  computeforeach(10000,1),
  computeforeach(10000,2),
  computeforeach(10000,4),
  computeforeach(10000,8),
  times=10
)
Unit: seconds
                     expr      min        lq     mean    median        uq
 computeforeach(10000, 1) 27.89123 28.034052 28.46545 28.463084 28.850983
 computeforeach(10000, 2) 16.07947 16.281416 16.74742 16.916717 17.029817
 computeforeach(10000, 4) 10.01754 10.275244 10.66689 10.852624 10.981694
 computeforeach(10000, 8)  5.93564  6.122312  7.40991  7.982423  8.173379
       max neval
 29.060149    10
 17.161362    10
 11.036780    10
  8.283377    10