Disclaimer: We’re talking about explicit parallelism!
Why do we need to parallelise code?
This is what code usually looks like:
N <- 1000
for (i in 1:N){
result <- myFunction(...)
}
doStuff(result,...)
Which can be translated into an example everyone knows from school:
1 worker takes 6 months to build a house.
How long will it take to build 12 houses?
Let’s go parallel!
1 worker takes 6 months to build a house.
How long will 12 workers need to build 1 house?
Therefore: We can obviously improve performance of the overall task by increasing the number of workers/ processors/ threads.
Pro’s and Con’s:
install.packages(c("parallel","snow","doParallel"), dependencies = TRUE,repos = "http://cran.us.r-project.org")
Whenever you do things in parallel, there are some obstacles you need to overcome.
Steps from your sequential source code towards going parallel:
Analyse at which point the execution is ‘wasteing’ time.
Can this part of code be optimized (in sequential manner)?
Can this part of code be parallelized?
Steps 1 & 2 can easily solved by simple analysis of code.
Step 3 is where things become more complex.
You need to ask questions like:
Advanced:
Let’s find out what these questions mean:
Shared data source: I.e. a dataframe used as parameter for functions
Example:
Thread A | Thread B |
---|---|
A1: Read V | B1: Read V |
A2: Add 1 to V | B2: Add 1 to V |
A3: Write back to V | B3: Write back to V |
If B1 is executed somewhere between A1 and A3 or vice versa. The code will return incorrect data. V = 2 instead of V = 3! Now imagine you’re putting a lock on each variable while you’re using it.
Thread A | Thread B |
---|---|
A1: Read V | B1: Read W |
A2: Read W | B2: Read V |
A3: Wait for W to be unlocked! | B3: Wait for V to be unlocked! |
lapply and sapply call a function for each item of a vector or a list
Readin ATM data
A <- read.table("http://www.trutschnig.net/Datensatz.txt", head=TRUE)
head(A)
Create dataframe to compare the runtime
funct <- c("lapply", "sapply", "parLapply", "parSapply", "doParallel")
runtime <- c(0,0,0,0,0)
compare <- data.frame(funct, runtime)
lapply returns a list
lapply(1:3, function(x) log10(x))
init <- Sys.time()
lapply(A$sum_out, function(x) log10(x))
compare$runtime[1] <- Sys.time() - init
sapply returns a vector or matrix
a = 2
sapply(1:3, function(x) c("log" = log10(x), "quad" = x^a))
init <- Sys.time()
sapply(A$sum_out, function(x) c("quad" = x^a, "log" = log10(x)))
compare$runtime[2] <- Sys.time() - init
compare
address <- url("http://www.trutschnig.net/RTR2015.RData")
load(address)
head(RTR2015)
Solution
# # Create dataframe
# funct <- c("lapply", "sapply", "parLapply", "parSapply", "doParallel")
# runtime <- c(0,0,0,0,0)
# compare2 <- data.frame(funct, runtime)
#
# # Calculate mean and variance
# avg_dl <- mean(RTR2015$rtr_speed_dl)
# sd_dl <- sd(RTR2015$rtr_speed_dl)
#
# # Lapply
# init <- Sys.time()
# lapply(RTR2015$rtr_speed_dl, function(x) (x - avg_dl) / sd_dl)
# compare2$runtime[1] <- Sys.time() - init
#
# # Sapply
# init <- Sys.time()
# sapply(RTR2015$rtr_speed_dl, function(x) c("init" = x, "std" = (x - avg_dl) / sd_dl))
# compare2$runtime[2] <- Sys.time() - init
#
# # Print out the comparison matrix
# compare2
Types of clusters supported by the parallel package:
PSOCK (Parallel Socket Cluster): creates new empty R sessions (nothing is inherited), default, for all operating systems
FORK: copies current R session locally (everything is inherited from master process), only UNIX/Mac
Start and stop cluster with one worker per core
library(parallel)
# get the number of cores available
no_cores <- detectCores(logical = TRUE) - 1
no_cores
## [1] 3
# Initialize cluster
cl <- makeCluster(no_cores, type = "PSOCK")
# Check properties
length(cl)
## [1] 3
cl[[1]]
## node of a socket cluster on host 'localhost' with pid 7412
# do stuff
# (clusterApply, parLapply, parSapply, doParallel ...)
# release bound resources
stopCluster(cl)
basis for the following functions, input has to be a vector, returns a list
cl <- makeCluster(no_cores)
clusterApply(cl, 1:3, fun = function(x) x^2)
## [[1]]
## [1] 1
##
## [[2]]
## [1] 4
##
## [[3]]
## [1] 9
stopCluster(cl)
input can be a list or a vector, returns a list
cl <- makeCluster(no_cores)
init <- Sys.time()
parLapply(cl, A$sum_out, function(x) log10(x))
compare$runtime[3] <- Sys.time() - init
stopCluster(cl)
input can be a list or a vector, returns a vector or a matrix
cl <- makeCluster(no_cores)
# Get outer value
clusterExport(cl, "a")
# clusterEvalQ(package)
init <- Sys.time()
parSapply(cl, A$sum_out, function(x) c("quad" = x^a, "log" = log10(x)))
compare$runtime[4] <- Sys.time() - init
stopCluster(cl)
compare
Solution
# Integrate the parallel library
library(parallel)
# Calculate the number of cores
no_cores <- detectCores() - 1
no_cores
# Create a cluster
cl <- makeCluster(no_cores)
# Export objects from the master to the workers
clusterExport(cl, avg_dl, sd_dl)
# parLapply
init <- Sys.time()
parLapply(cl, RTR2015$rtr_speed_dl, function(x) (x - avg_dl) / sd_dl)
compare2$runtime[3] <- Sys.time() - init
# parSapply
init <- Sys.time()
parSapply(cl, RTR2015$rtr_speed_dl, function(x) c("init" = x, "std" = (x - avg_dl) / sd_dl))
compare2$runtime[4] <- Sys.time() - init
# Stop the cluster
stopCluster(cl)
# Compare the runtimes
compare2
S(n,p) = \(\frac{1}{1-p+(p/n)}\)
S(n,p) := Speedup depending on n and p
n := Number of cores
p := Percentage of parallel code
What are the results from this equation?
library(foreach)
## Warning: package 'foreach' was built under R version 3.4.4
# Short example for a matrix
foreach(i = 1:3, .export = "a", .combine = rbind) %do% {
c("log" = log10(i), "quad" = i^a)
}
## log quad
## result.1 0.0000000 1
## result.2 0.3010300 4
## result.3 0.4771213 9
library(doParallel)
cl <- makeCluster(no_cores)
registerDoParallel(cl)
# Check properties
getDoParRegistered()
getDoParWorkers()
# Like sapply using foreach
init <- Sys.time()
foreach(i = A$sum_out, .export = "a", .combine = rbind) %dopar%
c("log" = log10(i), "quad" = i^a)
compare$runtime[5] <- Sys.time() - init
# Stop cluster
stopCluster(cl)
# Compare runtimes
plot(compare, sub = "Runtime of each function")
Solution
library(doParallel)
# Create and register cluster
cl <- makeCluster(no_cores)
registerDoParallel(cl)
# Check the properties
getDoParRegistered()
getDoParWorkers()
# foreach doParallel
init <- Sys.time()
foreach(i = RTR2015$rtr_speed_dl, .export = c("avg_dl", "sd_dl"), .combine = rbind) %dopar% {
c("init" = i, "std" = (i - avg_dl) / sd_dl)
}
compare2$runtime[5] <- Sys.time() - init
# Stop cluster
stopCluster(cl)
# Plot runtimes
plot(compare2, sub = "Runtime of each function")
Except for the recently described parallisation-methods you can additionally apply loadbalancing to your executions.
cl <- parallel::makeCluster(no_cores)
parallel::parLapplyLB(cl, A$sum_out, function(x) log10(x))
stopCluster(cl)
Everyone likes graphs, so: How do we visualize cluster usage? Well, we’re using R so there is a package with a suitable function!
library("snow")
## Warning: package 'snow' was built under R version 3.4.4
##
## Attaching package: 'snow'
## The following objects are masked from 'package:parallel':
##
## clusterApply, clusterApplyLB, clusterCall, clusterEvalQ,
## clusterExport, clusterMap, clusterSplit, makeCluster,
## parApply, parCapply, parLapply, parRapply, parSapply,
## splitIndices, stopCluster
BUT: As you can see from the messages from loading the package, there are some tradeoffs.
Your some functions of parallel-package will be replaced by snow-implementations. Further, the function we use, the snow.time()-function is only an experimental function, without any guarantees!
How to use it?
isPrime <- function(n){
res <- TRUE
for(i in 2:(n-1)){
if((n%%i) == 0) {
res <- FALSE
break
}
}
res
}
cl <- parallel::makeCluster(no_cores)
seq <- c(10:71)
ctime <- snow::snow.time(snow::clusterApply(cl, seq, fun = isPrime))
plot(ctime)
ctime
## elapsed send receive node 1 node 2 node 3
## 0.68 0.11 -0.07 0.58 0.55 0.55
Solution
library("snow","parallel")
# Calculate mean and variance
avg_dl <- mean(RTR2015$rtr_speed_dl)
sd_dl <- var(RTR2015$rtr_speed_dl)
# Create a cluster
cl <- parallel::makeCluster(no_cores)
# Export objects from the master to the workers
clusterExport(cl, "avg_dl")
clusterExport(cl, "sd_dl")
# parLapply
ctime <- snow::snow.time(snow::parLapply(cl, RTR2015$rtr_speed_dl, function(x) (x - avg_dl) / sd_dl))
## Warning in UseMethod("as.list"): ungenutzte Verbindung 8 (<-LAPTOP-
## TFH4TRBI:11820) geschlossen
## Warning in UseMethod("as.list"): ungenutzte Verbindung 7 (<-LAPTOP-
## TFH4TRBI:11820) geschlossen
## Warning in UseMethod("as.list"): ungenutzte Verbindung 6 (<-LAPTOP-
## TFH4TRBI:11820) geschlossen
# parSapply
ctimeSap <- snow::snow.time(snow::parSapply(cl, RTR2015$rtr_speed_dl, function(x) c("init" = x, "std" = (x - avg_dl) / sd_dl)))
# Stop the cluster
stopCluster(cl)
plot(ctime, title = "Cluster Usage parLapply")
plot(ctimeSap, title = "Cluster Usage parSapply")