<- function(n) {
compute
library(palmerpenguins)
# Our dataset
<- as.data.frame(penguins[c(4, 1)])
x
<- sample(344, 344, replace = TRUE)
ind <-
result1 glm(x[ind, 2] ~ x[ind, 1], family = binomial(logit))
coefficients(result1)
}
Simple example
The below code is running glm
with the palmerpenguins
dataset.
On the R console we now simply can run
compute(1)
(Intercept) x[ind, 1]
13.191595 -0.746895
If we now want to run the same compute function say a 100 times, we can use the sapply
function
<-sapply(1:100,compute) res
Let’s run this function a 10, 100 and 1000 times and measure the compute time
library(microbenchmark)
microbenchmark(
sapply(1:10,compute),
sapply(1:100,compute),
sapply(1:1000,compute),
times=10)
Unit: milliseconds
expr min lq mean median uq
sapply(1:10, compute) 26.2505 26.34203 27.85132 26.86744 29.82167
sapply(1:100, compute) 275.4522 277.92133 281.41817 281.80109 283.90646
sapply(1:1000, compute) 2727.4093 2804.23806 2826.31019 2820.88694 2851.78349
max neval
32.20464 10
289.00417 10
2925.19585 10
As we can see, the compute time increases fairly linearly with the number of calculations. With 1000 computations we reach almost 3 seconds.
Now, let’s do the same with clustermq
and the Q
function
First, let’s define the appropriate template
options(
clustermq.scheduler = "slurm",
clustermq.template = "slurm.tmpl"
)
library(clustermq)
system.time(res<-Q(compute, n=1:1000, n_jobs=1))
Submitting 1 worker jobs (ID: cmq7519) ...
Running 1,000 calculations (0 objs/0 Mb common; 1 calls/chunk) ...
Master: [6.0s 8.7% CPU]; Worker: [avg 90.9% CPU, max 307.6 Mb]
user system elapsed
0.575 0.040 6.049
The Q function shows a progress bar and at the end will output usage stats for the Master (where the Q function was spawned from. i.e. from the R console in RStudio) and the Worker for both CPU and memory utilisation.
We see that the same calculation that took 3 seconds with sapply
now takes longer. This is caused by the overhead caused by * submitting a job to the queue (it takes time from submit to actual start of the job) * communication between the Master and the Worker processes * Time taken on Master process to reduce all the partial results into one final dataset
Influence of chunk_size
While we cannot optimise much as far as the time between submit and actual start of the job is concerned, we can otimise the communication between the Master and the Worker. As we see from our example, clustermq
only runs 1 call/chunk, which means the Master sends 1 number to the Worker, gets him to compute the result, send it back and then the Master send the next number. Sending very small amounts of data over the network is always very costly and should be avoided (for small amounts of data, transfer time is limited by network latency rather than actual network bandwidth). We can optimise by adding more function calls to a chunk. Let’s do it straight in microbenchmark
library(clustermq)
library(microbenchmark)
microbenchmark(
<-Q(compute, n=1:1000, verbose=FALSE, n_jobs=1),
res<-Q(compute, n=1:1000, verbose=FALSE, n_jobs=1, chunk_size=10),
res<-Q(compute, n=1:1000, verbose=FALSE, n_jobs=1, chunk_size=100),
restimes=10
)
Unit: seconds
expr
res <- Q(compute, n = 1:1000, verbose = FALSE, n_jobs = 1)
res <- Q(compute, n = 1:1000, verbose = FALSE, n_jobs = 1, chunk_size = 10)
res <- Q(compute, n = 1:1000, verbose = FALSE, n_jobs = 1, chunk_size = 100)
min lq mean median uq max neval
5.386837 6.002397 5.978394 6.030717 6.062987 6.132101 10
4.687595 4.984848 5.191266 5.015985 5.036211 7.022424 10
4.190342 4.599913 4.976782 4.906269 4.988659 7.057703 10
What you will see is that the larger the chunk_size
, the faster the code execution. This is also evident by the increased percentual CPU utilisation of the Worker (not shown in the rendered doc). So with chunk_size=100
we are down to 5 seconds which is good enough for now.
Scaling up
We now can increase the number of jobs used for our computation and see what happens
library(clustermq)
library(microbenchmark)
microbenchmark(
<-Q(compute, n=1:1000, verbose=FALSE, n_jobs=1, chunk_size=10),
res<-Q(compute, n=1:1000, verbose=FALSE, n_jobs=2, chunk_size=10),
res<-Q(compute, n=1:1000, verbose=FALSE, n_jobs=4, chunk_size=10),
res<-Q(compute, n=1:1000, verbose=FALSE, n_jobs=8, chunk_size=10),
restimes=10
)
Unit: seconds
expr
res <- Q(compute, n = 1:1000, verbose = FALSE, n_jobs = 1, chunk_size = 10)
res <- Q(compute, n = 1:1000, verbose = FALSE, n_jobs = 2, chunk_size = 10)
res <- Q(compute, n = 1:1000, verbose = FALSE, n_jobs = 4, chunk_size = 10)
res <- Q(compute, n = 1:1000, verbose = FALSE, n_jobs = 8, chunk_size = 10)
min lq mean median uq max neval
4.414095 4.467724 4.903429 5.023414 5.174582 5.465580 10
2.983095 3.003283 3.518999 3.599471 3.773929 4.165966 10
2.119886 2.281146 2.792152 2.828585 3.271375 3.301467 10
1.967427 2.719131 2.812413 2.750438 2.973397 3.747350 10
We now can see that the more jobs we use the faster the job execution gets - we however also see that using 8 jobs does not make the code run 8 times as fast. This again is caused by the overhead we described in the previous section that cannot be parallelized.
Let’s try and get the jobs more work to do by scaling up further, i.e. use n=1:10000
and chunk_size=100
.
library(clustermq)
library(microbenchmark)
microbenchmark(
<-Q(compute, n=1:10000, verbose=FALSE, n_jobs=1, chunk_size=100),
res<-Q(compute, n=1:10000, verbose=FALSE, n_jobs=2, chunk_size=100),
res<-Q(compute, n=1:10000, verbose=FALSE, n_jobs=4, chunk_size=100),
res<-Q(compute, n=1:10000, verbose=FALSE, n_jobs=8, chunk_size=100),
restimes=10
)
Unit: seconds
expr
res <- Q(compute, n = 1:10000, verbose = FALSE, n_jobs = 1, chunk_size = 100)
res <- Q(compute, n = 1:10000, verbose = FALSE, n_jobs = 2, chunk_size = 100)
res <- Q(compute, n = 1:10000, verbose = FALSE, n_jobs = 4, chunk_size = 100)
res <- Q(compute, n = 1:10000, verbose = FALSE, n_jobs = 8, chunk_size = 100)
min lq mean median uq max neval
29.647617 29.848939 30.235241 30.069296 30.153109 32.349257 10
16.012912 16.095444 16.840333 16.229791 18.390670 18.487193 10
8.758291 8.862055 9.251549 8.963853 9.018703 11.397048 10
5.009530 5.549281 6.064703 5.709003 5.898295 8.019442 10
This is pretty good - 8 jobs provide 5 times faster run time compared to 1 job.
Using a foreach
loop
Below you will find the same function, now using foreach
instead of the Q
function.
<- function(samples, tasks) {
computeforeach library(foreach)
library(palmerpenguins)
# Register parallel backend to foreach
register_dopar_cmq(
n_jobs = tasks,
log_worker = FALSE,
verbose=FALSE,
chunk_size=100
)
# Our dataset
<- as.data.frame(penguins[c(4, 1)])
x
# Number of samples to simulate
<- samples
samples
# Main loop
foreach(i = 1:samples, .combine = rbind) %dopar% {
<- sample(344, 344, replace = TRUE)
ind <-
result1 glm(x[ind, 2] ~ x[ind, 1], family = binomial(logit))
coefficients(result1)
} }
library(clustermq)
library(microbenchmark)
microbenchmark(
computeforeach(10000,1),
computeforeach(10000,2),
computeforeach(10000,4),
computeforeach(10000,8),
times=10
)
Unit: seconds
expr min lq mean median uq
computeforeach(10000, 1) 27.89123 28.034052 28.46545 28.463084 28.850983
computeforeach(10000, 2) 16.07947 16.281416 16.74742 16.916717 17.029817
computeforeach(10000, 4) 10.01754 10.275244 10.66689 10.852624 10.981694
computeforeach(10000, 8) 5.93564 6.122312 7.40991 7.982423 8.173379
max neval
29.060149 10
17.161362 10
11.036780 10
8.283377 10