
R語言并行化基礎與提高
本文將介紹R中的并行計算,并給出了一些常見的陷進以及避免它們的小技巧。
使用并行計算的原因就是因為程序運行時間太長。大部分程序都是可以并行化的,它們大部分都是Embarrassingly parallel。這里介紹幾種可以并行化的方法:
Bootstrapping
交叉驗證(Cross-validation)
(Multivariate Imputation by Chained Equations ,MICE)相關介紹:R語言中的缺失值處理
擬合多元回歸方程
學習lapply是關鍵
沒有早點學習lapply是我的遺憾之一。這函數即優美又簡單:它只需要一個參數(一個vector或list),和一個以該參數為輸入的函數,最后返回一個列表。
> lapply(1:3, function(x) c(x, x^2, x^3))
[[1]]
[1] 1 1 1
[[2]]
[1] 2 4 8
[[3]]
[1] 3 9 27
你還可以添加額外的參數:
> lapply(1:3/3, round, digits=3)
[[1]]
[1] 0.333
[[2]]
[1] 0.667
[[3]]
[1] 1
當每個元素都是獨立地計算時,這個任務就是 Embarrassingly parallel的。當你學習完使用lapply之后,你會發現并行化你的代碼就像喝水一樣簡單。
parallel包
使用 parallel包,首先要初始化一個集群,這個集群的數量最好是你CPU核數-1。如果一臺8核的電腦建立了數量為8的集群,那你的CPU就干不了其他事情了。所以可以這樣啟動一個集群:
library(parallel)
# Calculate the number of cores
no_cores <- detectCores() - 1
# Initiate cluster
cl <- makeCluster(no_cores)
現在只需要使用并行化版本的lapply,parLapply就可以了
parLapply(cl, 2:4,
function(exponent)
2^exponent)
[[1]]
[1] 4
[[2]]
[1] 8
[[3]]
[1] 16
當我們結束后,要記得關閉集群,否則你電腦的內存會始終被R占用
stopCluster(cl)
變量作用域
在Mac/Linux中你可以使用 makeCluster(no_core, type="FORK")這一選項從而當你并行運行的時候可以包含所有環境變量。
在Windows中由于使用的是Parallel Socket Cluster (PSOCK),所以每個集群只會加載base包,所以你運行時要指定加載特定的包或變量:
cl<-makeCluster(no_cores)
base <- 2
clusterExport(cl, "base")
parLapply(cl,
2:4,
function(exponent)
base^exponent)
stopCluster(cl)
[[1]]
[1] 4
[[2]]
[1] 8
[[3]]
[1] 16
注意到你需要用clusterExport(cl,
"base")把base這一個變量加載到集群當中。如果你在函數中使用了一些其他的包就要使用clusterEvalQ加載進去,比如說,使用rms包,那么就用clusterEvalQ(cl,
library(rms))。要注意的是,在clusterExport 加載某些變量后,這些變量的任何變化都會被忽略:
cl<-makeCluster(no_cores)
clusterExport(cl, "base")
base <- 4
# Run
parLapply(cl,
2:4,
function(exponent)
base^exponent)
# Finish
stopCluster(cl)
[[1]]
[1] 4
[[2]]
[1] 8
[[3]]
[1] 16
使用parSapply
如果你想程序返回一個向量或者矩陣。而不是一個列表,那么就應該使用sapply,他同樣也有并行版本parSapply:
> parSapply(cl, 2:4,
function(exponent)
base^exponent)
[1] 4 8 16
輸出矩陣并顯示行名和列名(因此才需要使用as.character)
> parSapply(cl, as.character(2:4),
function(exponent){
x <- as.numeric(exponent)
c(base = base^x, self = x^x)
})
2 3 4
base 4 8 16
self 4 27 256
foreach包
設計foreach包的思想可能想要創建一個lapply和for循環的標準,初始化的過程有些不同,你需要register注冊集群:
library(foreach)
library(doParallel)
cl<-makeCluster(no_cores)
registerDoParallel(cl)
要記得最后要結束集群(不是用stopCluster()):
stopImplicitCluster()
foreach函數可以使用參數.combine控制你匯總結果的方法:
> foreach(exponent = 2:4,
.combine = c) %dopar%
base^exponent
[1] 4 8 16
> foreach(exponent = 2:4,
.combine = rbind) %dopar%
base^exponent
[,1]
result.1 4
result.2 8
result.3 16
foreach(exponent = 2:4,
.combine = list,
.multicombine = TRUE) %dopar%
base^exponent
[[1]]
[1] 4
[[2]]
[1] 8
[[3]]
[1] 16
注意到最后list的combine方法是默認的。在這個例子中用到一個.multicombine參數,他可以幫助你避免嵌套列表。比如說list(list(result.1, result.2), result.3) :
> foreach(exponent = 2:4,
.combine = list) %dopar%
base^exponent
[[1]]
[[1]][[1]]
[1] 4
[[1]][[2]]
[1] 8
[[2]]
[1] 16
變量作用域
在foreach中,變量作用域有些不同,它會自動加載本地的環境到函數中:
> base <- 2
> cl<-makeCluster(2)
> registerDoParallel(cl)
> foreach(exponent = 2:4,
.combine = c) %dopar%
base^exponent
stopCluster(cl)
[1] 4 8 16
但是,對于父環境的變量則不會加載,以下這個例子就會拋出錯誤:
test <- function (exponent) {
foreach(exponent = 2:4,
.combine = c) %dopar%
base^exponent
}
test()
Error in base^exponent : task 1 failed - "object 'base' not found"
為解決這個問題你可以使用.export這個參數而不需要使用clusterExport。注意的是,他可以加載最終版本的變量,在函數運行前,變量都是可以改變的:
base <- 2
cl<-makeCluster(2)
registerDoParallel(cl)
base <- 4
test <- function (exponent) {
foreach(exponent = 2:4,
.combine = c,
.export = "base") %dopar%
base^exponent
}
test()
stopCluster(cl)
[1] 4 8 16
類似的你可以使用.packages參數來加載包,比如說:.packages = c("rms", "mice")
使用Fork還是sock?
我在windows上做了很多分析,也習慣了使用PSOCK系統。對于使用其他系統的人要意識到這兩個的區別:
FORK:”to divide in branches and go separate ways”
系統:Unix/Mac (not Windows)
環境: 所有
PSOCK:并行socket集群
系統: All (including Windows)
環境: 空
內存控制
如果你不打算使用windows的話,建議你嘗試FORK模式,它可以實現內存共享,節省你的內存。
PSOCK:
library(pryr) # Used for memory analyses
cl<-makeCluster(no_cores)
clusterExport(cl, "a")
clusterEvalQ(cl, library(pryr))
parSapply(cl, X = 1:10, function(x) {address(a)}) == address(a)
[1] FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE
FORK :
cl<-makeCluster(no_cores, type="FORK")
parSapply(cl, X = 1:10, function(x) address(a)) == address(a)
[1] TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE
你不需要花費太多時間去配置你的環境,有趣的是,你不需要擔心變量沖突:
b <- 0
parSapply(cl, X = 1:10, function(x) {b <- b + 1; b})
# [1] 1 1 1 1 1 1 1 1 1 1
parSapply(cl, X = 1:10, function(x) {b <<- b + 1; b})
# [1] 1 2 3 4 5 1 2 3 4 5
b
# [1] 0
調試
當你在并行環境中工作是,debug是很困難的,你不能使用browser/cat/print等函數來發現你的問題。
tryCatch-list方法
使用stop()函數這不是一個好方法,因為當你收到一個錯誤信息時,很可能這個錯誤信息你在很久之前寫的,都快忘掉了,但是當你的程序跑了1,2天后,突然彈出這個錯誤,就只因為這一個錯誤,你的程序終止了,并把你之前的做的計算全部扔掉了,這是很討厭的。為此,你可以嘗試使用tryCatch去捕捉那些錯誤,從而使得出現錯誤后程序還能繼續執行:
foreach(x=list(1, 2, "a")) %dopar%
{
tryCatch({
c(1/x, x, 2^x)
}, error = function(e) return(paste0("The variable '", x, "'",
" caused the error: '", e, "'")))
}
[[1]]
[1] 1 1 2
[[2]]
[1] 0.5 2.0 4.0
[[3]]
[1] "The variable 'a' caused the error: 'Error in 1/x: non-numeric argument to binary operator\n'"
這也正是我喜歡list的原因,它可以方便的將所有相關的數據輸出,而不是只輸出一個錯誤信息。這里有一個使用rbind在lapply進行conbine的例子:
`out <- lapply(1:3, function(x) c(x, 2^x, x^x))
do.call(rbind, out)
[,1] [,2] [,3]
[1,] 1 2 1
[2,] 2 4 4
[3,] 3 8 27
創建一個文件輸出
當我們無法在控制臺觀測每個工作時,我們可以設置一個共享文件,讓結果輸出到文件當中,這是一個想當舒服的解決方案:
cl<-makeCluster(no_cores, outfile = "debug.txt")
registerDoParallel(cl)
foreach(x=list(1, 2, "a")) %dopar%
{
print(x)
}
stopCluster(cl)
starting worker pid=7392 on localhost:11411 at 00:11:21.077
starting worker pid=7276 on localhost:11411 at 00:11:21.319
starting worker pid=7576 on localhost:11411 at 00:11:21.762
[1] 2]
[1] "a"
創建一個結點專用文件
一個或許更為有用的選擇是創建一個結點專用的文件,如果你的數據集存在一些問題的時候,可以方便觀測:
cl<-makeCluster(no_cores, outfile = "debug.txt")
registerDoParallel(cl)
foreach(x=list(1, 2, "a")) %dopar%
{
cat(dput(x), file = paste0("debug_file_", x, ".txt"))
}
stopCluster(cl)
partools包
partools這個包有一個dbs()函數或許值得一看(使用非windows系統值得一看),他允許你聯合多個終端給每個進程進行debug。
Caching
當做一個大型計算時,我強烈推薦使用一些緩存。這或許有多個原因你想要結束計算,但是要遺憾地浪費了計算的寶貴的時間。這里有一個包可以做緩存,R.cache,但是我發現自己寫個函數來實現更加簡單。你只需要嵌入digest包就可以。digest()函數是一個散列函數,把一個R對象輸入進去可以輸出一個md5值或sha1等從而得到一個唯一的key值,當你key匹配到你保存的cache中的key時,你就可以繼續你的計算了,而不需要將算法重新運行,以下是一個使用例子:
cacheParallel <- function(){
vars <- 1:2
tmp <- clusterEvalQ(cl,
library(digest))
parSapply(cl, vars, function(var){
fn <- function(a) a^2
dg <- digest(list(fn, var))
cache_fn <-
sprintf("Cache_%s.Rdata",
dg)
if (file.exists(cache_fn)){
load(cache_fn)
}else{
var <- fn(var);
Sys.sleep(5)
save(var, file = cache_fn)
}
return(var)
})
}
這個例子很顯然在第二次運行的時候并沒有啟動Sys.sleep,而是檢測到了你的cache文件,加載了上一次計算后的cache,你就不必再計算Sys.sleep了,因為在上一次已經計算過了。
system.time(out <- cacheParallel())
# user system elapsed
# 0.003 0.001 5.079
out
# [1] 1 4
system.time(out <- cacheParallel())
# user system elapsed
# 0.001 0.004 0.046
out
# [1] 1 4
# To clean up the files just do:
file.remove(list.files(pattern = "Cache.+\.Rdata"))
載入平衡
任務載入
需要注意的是,無論parLapply還是foreach都是一個包裝(wrapper)的函數。這意味著他們不是直接執行并行計算的代碼,而是依賴于其他函數實現的。在parLapply中的定義如下:
parLapply <- function (cl = NULL, X, fun, ...)
{
cl <- defaultCluster(cl)
do.call(c, clusterApply(cl, x = splitList(X, length(cl)),
fun = lapply, fun, ...), quote = TRUE)
}
注意到splitList(X, length(cl))
,他會將任務分割成多個部分,然后將他們發送到不同的集群中。如果你有很多cache或者存在一個任務比其他worker中的任務都大,那么在這個任務結束之前,其他提前結束的worker都會處于空閑狀態。為了避免這一情況,你需要將你的任務盡量平均分配給每個worker。舉個例子,你要計算優化神經網絡的參數,這一過程你可以并行地以不同參數來訓練神經網絡,你應該將如下代碼:
# From the nnet example
parLapply(cl, c(10, 20, 30, 40, 50), function(neurons)
nnet(ir[samp,], targets[samp,],
size = neurons))
改為:
# From the nnet example
parLapply(cl, c(10, 50, 30, 40, 20), function(neurons)
nnet(ir[samp,], targets[samp,],
size = neurons))
內存載入
在大數據的情況下使用并行計算會很快的出現問題。因為使用并行計算會極大的消耗內存,你必須要注意不要讓你的R運行內存到達內存的上限,否則這將會導致崩潰或非常緩慢。使用Forks是一個控制內存上限的一個重要方法。Fork是通過內存共享來實現,而不需要額外的內存空間,這對性能的影響是很顯著的(我的系統時16G內存,8核心):
> rm(list=ls())
> library(pryr)
> library(magrittr)
> a <- matrix(1, ncol=10^4*2, nrow=10^4)
> object_size(a)
1.6 GB
> system.time(mean(a))
user system elapsed
0.338 0.000 0.337
> system.time(mean(a + 1))
user system elapsed
0.490 0.084 0.574
> library(parallel)
> cl <- makeCluster(4, type = "PSOCK")
> system.time(clusterExport(cl, "a"))
user system elapsed
5.253 0.544 7.289
> system.time(parSapply(cl, 1:8,
function(x) mean(a + 1)))
user system elapsed
0.008 0.008 3.365
> stopCluster(cl); gc();
> cl <- makeCluster(4, type = "FORK")
> system.time(parSapply(cl, 1:8,
function(x) mean(a + 1)))
user system elapsed
0.009 0.008 3.123
> stopCluster(cl)
FORKs可以讓你并行化從而不用崩潰:
> cl <- makeCluster(8, type = "PSOCK")
> system.time(clusterExport(cl, "a"))
user system elapsed
10.576 1.263 15.877
> system.time(parSapply(cl, 1:8, function(x) mean(a + 1)))
Error in checkForRemoteErrors(val) :
8 nodes produced errors; first error: cannot allocate vector of size 1.5 Gb
Timing stopped at: 0.004 0 0.389
> stopCluster(cl)
> cl <- makeCluster(8, type = "FORK")
> system.time(parSapply(cl, 1:8, function(x) mean(a + 1)))
user system elapsed
0.014 0.016 3.735
> stopCluster(cl)
當然,他并不能讓你完全解放,如你所見,當我們創建一個中間變量時也是需要消耗內存的:
> a <- matrix(1, ncol=10^4*2.1, nrow=10^4)
> cl <- makeCluster(8, type = "FORK")
> parSapply(cl, 1:8, function(x) {
+ b <- a + 1
+ mean(b)
+ })
Error in unserialize(node$con) : error reading from connection
內存建議
盡量使用rm()避免無用的變量
盡量使用gc()釋放內存。即使這在R中是自動執行的,但是當它沒有及時執行,在一個并行計算的情況下,如果沒有及時釋放內存,那么它將不會將內存返回給操作系統,從而影響了其他worker的執行。
通常并行化在大規模運算下很有用,但是,考慮到R中的并行化存在內存的初始化成本,所以考慮到內存的情況下,顯然小規模的并行化可能會更有用。
有時候在并行計算時,不斷做緩存,當達到上限時,換回串行計算。
你也可以手動的控制每個核所使用的內存數量,一個簡單的方法就是:memory.limit()/memory.size() = max cores
其他建議
一個常用的CPU核數檢測函數:
max(1, detectCores() - 1)
1
永遠不要使用set.seed(),使用clusterSetRNGStream()來代替設置種子,如果你想重現結果。
如果你有Nvidia 顯卡,你可以嘗試使用gputools 包進行GPU加速(警告:安裝可能會很困難)
當使用mice并行化時記得使用ibind()來合并項。
數據分析咨詢請掃描二維碼
若不方便掃碼,搜微信號:CDAshujufenxi
CDA數據分析師證書考試體系(更新于2025年05月22日)
2025-05-26解碼數據基因:從數字敏感度到邏輯思維 每當看到超市貨架上商品的排列變化,你是否會聯想到背后的銷售數據波動?三年前在零售行 ...
2025-05-23在本文中,我們將探討 AI 為何能夠加速數據分析、如何在每個步驟中實現數據分析自動化以及使用哪些工具。 數據分析中的AI是什么 ...
2025-05-20當數據遇見人生:我的第一個分析項目 記得三年前接手第一個數據分析項目時,我面對Excel里密密麻麻的銷售數據手足無措。那些跳動 ...
2025-05-20在數字化運營的時代,企業每天都在產生海量數據:用戶點擊行為、商品銷售記錄、廣告投放反饋…… 這些數據就像散落的拼圖,而相 ...
2025-05-19在當今數字化營銷時代,小紅書作為國內領先的社交電商平臺,其銷售數據蘊含著巨大的商業價值。通過對小紅書銷售數據的深入分析, ...
2025-05-16Excel作為最常用的數據分析工具,有沒有什么工具可以幫助我們快速地使用excel表格,只要輕松幾步甚至輸入幾項指令就能搞定呢? ...
2025-05-15數據,如同無形的燃料,驅動著現代社會的運轉。從全球互聯網用戶每天產生的2.5億TB數據,到制造業的傳感器、金融交易 ...
2025-05-15大數據是什么_數據分析師培訓 其實,現在的大數據指的并不僅僅是海量數據,更準確而言是對大數據分析的方法。傳統的數 ...
2025-05-14CDA持證人簡介: 萬木,CDA L1持證人,某電商中廠BI工程師 ,5年數據經驗1年BI內訓師,高級數據分析師,擁有豐富的行業經驗。 ...
2025-05-13CDA持證人簡介: 王明月 ,CDA 數據分析師二級持證人,2年數據產品工作經驗,管理學博士在讀。 學習入口:https://edu.cda.cn/g ...
2025-05-12CDA持證人簡介: 楊貞璽 ,CDA一級持證人,鄭州大學情報學碩士研究生,某上市公司數據分析師。 學習入口:https://edu.cda.cn/g ...
2025-05-09CDA持證人簡介 程靖 CDA會員大咖,暢銷書《小白學產品》作者,13年頂級互聯網公司產品經理相關經驗,曾在百度、美團、阿里等 ...
2025-05-07相信很多做數據分析的小伙伴,都接到過一些高階的數據分析需求,實現的過程需要用到一些數據獲取,數據清洗轉換,建模方法等,這 ...
2025-05-06以下的文章內容來源于劉靜老師的專欄,如果您想閱讀專欄《10大業務分析模型突破業務瓶頸》,點擊下方鏈接 https://edu.cda.cn/g ...
2025-04-30CDA持證人簡介: 邱立峰 CDA 數據分析師二級持證人,數字化轉型專家,數據治理專家,高級數據分析師,擁有豐富的行業經驗。 ...
2025-04-29CDA持證人簡介: 程靖 CDA會員大咖,暢銷書《小白學產品》作者,13年頂級互聯網公司產品經理相關經驗,曾在百度,美團,阿里等 ...
2025-04-28CDA持證人簡介: 居瑜 ,CDA一級持證人國企財務經理,13年財務管理運營經驗,在數據分析就業和實踐經驗方面有著豐富的積累和經 ...
2025-04-27數據分析在當今信息時代發揮著重要作用。單因素方差分析(One-Way ANOVA)是一種關鍵的統計方法,用于比較三個或更多獨立樣本組 ...
2025-04-25CDA持證人簡介: 居瑜 ,CDA一級持證人國企財務經理,13年財務管理運營經驗,在數據分析就業和實踐經驗方面有著豐富的積累和經 ...
2025-04-25