熱線電話:13121318867

登錄
首頁精彩閱讀加快python算法的四個方法:Dask篇
加快python算法的四個方法:Dask篇
2020-06-08
收藏

CDA數據分析師 出品

相信大家在做一些算法經常會被龐大的數據量所造成的超多計算量需要的時間而折磨的痛苦不已,接下來我們圍繞四個方法來幫助大家加快一下python的計算時間,減少大家在算法上的等待時間。今天給大家講述最后一方面的內容,關于Dask的方法運用。

1.簡介

隨著對機器學習算法并行化的需求不斷增加,由于數據大小甚至模型大小呈指數級增長,如果我們擁有一個工具,可以幫助我們并行化處理Pandas的DataFrame,可以并行化處理Numpy的計算,甚至并行化我們的機器學習算法(可能是來自sklearn和Tensorflow的算法)也沒有太多的麻煩,那它對我們會非常有幫助。

好消息是確實存在這樣的庫,其名稱為Dask。Dask是一個并行計算庫,它不僅有助于并行化現有的機器學習工具(Pandas和Numpy)(即使用高級集合),而且還有助于并行化低級任務/功能,并且可以通過制作任務圖來處理這些功能之間的復雜交互。[ 即使用低級調度程序 ]這類似于Python的線程或多處理模塊。

他們也有一個單獨的機器學習庫dask-ml,這與如現有的庫集成如sklearn,xgboost和tensorflow。

Dask通過繪制任務之間的交互圖來并行化分配給它的任務。使用Dask的.visualize方法來可視化你的工作將非常有幫助,該方法可用于所有數據類型以及你計算的復雜任務鏈。此方法將輸出你的任務圖,并且如果你的任務在每個級別具有多個節點(即,你的任務鏈結構在多個層次上具有許多獨立的任務,例如數據塊上的并行任務),然后Dask將能夠并行化它們。

注意: Dask仍是一個相對較新的項目。它還有很長的路要走。不過,如果你不想學習全新的API(例如PySpark),Dask是你的最佳選擇,將來肯定會越來越好。Spark / PySpark仍然遙遙領先,并且仍將繼續改進。這是一個完善的Apache項目。

2.數據類型

Dask中的每種數據類型都提供現有數據類型的分布式版本,例如pandas中的DataFrame、numpy中的ndarray和Python中的list。這些數據類型可以大于你的內存,Dask將以Blocked方式對數據并行(y)運行計算。Blocked從某種意義上說,它們是通過執行許多小的計算(即,以塊為單位)來執行大型計算的,而塊的數量為chunks的總數。

a)數組:

網格中的許多Numpy數組作為Dask數組

Dask Array對非常大的數組進行操作,將它們劃分為塊并并行執行這些塊。它有許多可用的numpy方法,你可以使用這些方法來加快速度。但是其中一些沒有實現。

只要支持numpy切片,Dask Array就可以從任何類似數組結構中讀取數據,并且可以通過使用并且通過使用Dask . Array .from_array方法具有.shape屬性。它還可以讀取.npy和.zarr文件。

import dask.array as daimport numpy as nparr = numpy.random.randint(1, 1000, (10000, 10000))darr = da.from_array(arr, chunks=(1000, 1000))# 它會生成大小為(1000,1000)的塊darr.npartitioins# 100

當你的數組真的很重時(即它們無法放入內存)并且numpy對此無能為力時,可以使用它。因此,Dask將它們分為數組塊并為你并行處理它們。

現在,Dask對每種方法進行惰性評估。因此,要實際計算函數的值,必須使用.compute方法。它將以塊為單位并行計算結果,同時并行化每個獨立任務。

result = darr.compute

1)元素數量較少時,Numpy比Dask快;2)Dask接管了Numpy,耗時約1e7個元素;3)Numpy無法產生更多元素的結果,因為它無法將它們存儲在內存中。

b)DataFrame

5個Pandas DataFrame在一個Dask DataFrame中提供每月數據(可以來自diff文件)

與Dask Arrays相似,Dask DataFrames通過將文件劃分為塊并將這些塊的計算函數并行化,從而對不適合內存非常大的數據文件進行并行計算。

import dask.dataframe as dddf = dd.read_csv("BigFile(s).csv", blocksize=50e6)

現在,你可以應用/使用pandas庫中可用的大多數功能,并在此處應用。

agg = df.groupby(["column"]).aggregate(["sum", "mean", "max", "min"])agg.columns = new_column_names #請查看notebookdf_new = df.merge(agg.reset_index, on="column", how="left")df_new.compute.head

c)Bag:

Dask Bag包并行處理包含多個數據類型元素Python的list相似對象。當你嘗試處理一些半結構化數據(例如JSON Blob或日志文件)時,此功能很有用。

import dask.bag as dbb = db.from_txt("BigSemiStructuredData.txt")b.take(1)

Daskbag逐行讀取,.take方法輸出指定行數的元組。

Dask Bag在這樣的Python對象集合上實現例如map,filter,fold,和groupby等操作。它使用Python迭代器并行地完成這個任務,占用的內存很小。它類似于PyToolz的并行版本或PySpark RDD的Python版本。

filtered = b.filter(lambda x: x["Name"]=="James")\ .map(lambda x: x["Address"] = "New_Address")filtered.compute

3.延時

如果你的任務有點簡單,并且你不能或不想使用這些高級集合來執行操作,則可以使用低級調度程序,該程序可幫助你使用dask.delayed接口并行化代碼/算法。dask.delayed也可以進行延遲計算。

import dask.delayed as delay@delaydef sq(x): return x**2@delay def add(x, y): return x+y@delay def sum(arr): sum=0 for i in range(len(arr)): sum+=arr[i] return sum

你可以根據需要在這些函數之間添加復雜的交互,使用上一個任務的結果作為下一個任務的參數。Dask不會立即計算這些函數,而是會為你的任務繪制圖形,有效地合并你使用的函數之間的交互。

inputs = list(np.arange(1, 11))#將外接程序 dask.delayed 加入到列表temp = for i in range(len(inputs)): temp.append(sq(inputs[i])) # 計算輸入的sq并保存 # 延遲計算在列表inputs=temp; temp = for i in range(0, len(inputs)-1, 2): temp.append(add(inputs[i]+inputs[i+1])) # 添加兩個連續 # 結果從prev步驟inputs = tempresult = sum(inputs) # 將所有prev步驟的結果相加results.compute

你可以將延遲添加到具有許多可能的小塊的任何可并行化代碼中,從而獲得加速的效果。它可以是你想計算的許多函數,例如上面的示例,或者可以使用并行讀取多個文件pandas.read_csv。

4.分布式

首先,到目前為止,我們一直使用Dask的默認調度器來計算任務的結果。但是你可以根據需要從Dask提供的選項中更改它們。

Dask 帶有四個可用的調度程序:

· threaded:由線程池支持的調度程序

· processes:由進程池支持的調度程序

· single-threaded(又名" sync"):同步調度程序,用于調試

· distributed:用于在多臺計算機上執行圖形的分布式調度程序

result.compute(scheduler="single-threaded") #用于調試# 或者dask.config.set(scheduler="single-threaded")result.compute#注:(從官方網頁)#當被稱為GIL的函數釋放時,線程任務將工作得很好,而多處理總是啟動時間較慢,并且在任務之間需要大量的通信。# 你可以通過其中一個得到調度程序 commands:dask.threaded.get, dask.multiprocessing.get, dask.local.get_sync#單線程的最后一個

但是,Dask還有一個調度器,dask.distributed由于以下原因它可能是首選使用的:

1. 它提供了異步API的訪問,尤其是Future,

1. 它提供了一個診斷儀表板,可以提供有關性能和進度的寶貴見解

1. 它可以更復雜地處理數據位置,因此在需要多個流程的工作負載上,它比多處理調度程序更有效。

你可以創建一個Dask的dask.distributed調度程序,通過導入和創建客戶端實現分布式調度器

from dask.distributed import Clientclient = Client # Set up a local cluster# 你可以導航到http://localhost:8787/status 查看# 診斷儀表板,如果你有Bokeh安裝的話

現在,你可以使用client.submit方法,將函數和參數作為其參數,從而將任務提交到此集群。然后我們可以使用client.gather或.result方法來收集結果。

sent = client.submit(sq, 4) # sq: square 函數result = client.gather(sent) # 或者 sent.result

你也可以僅使用dask.distributed.progress來查看當前單元格中任務的進度。你還可以明確選擇使用dask.distributed.wait來等待任務完成。

Note: (Local Cluster)有時您會注意到Dask正在超出內存使用,即使它正在劃分任務。它可能發生在您身上,因為您試圖在數據集上使用的函數需要您的大部分數據進行處理,而多重處理可能使情況變得更糟,因為所有工作人員都可能試圖將數據集復制到內存中。這可能發生在聚合的情況下?;蛘吣赡芟胂拗艱ask只使用特定數量的內存。

在這些情況下,您可以使用Dask.distributed。LocalCluster參數,并將它們傳遞給Client,從而使用本地機器的核心構建LocalCluster。

from dask.distributed import Client, LocalClusterclient = Client(n_workers=1, threads_per_worker=1, processes=False, memory_limit='25GB', scheduler_port=0, silence_logs=True, diagnostics_port=0)client

'scheduler_port=0'和' stics_port=0'將為這個特定的客戶端選擇隨機端口號。在'process =False'的情況下,dask的客戶端不會復制數據集,這可能發生在您所創建的每個進程中。您可以根據自己的需要或限制對客戶機進行調優,要了解更多信息,可以查看LocalCluster的參數。您還可以在同一臺機器的不同端口上使用多個客戶機。

5.機器學習

Dask也有一個庫,可以幫助并允許大多數流行的機器學習庫,例如sklearn,tensorflow和xgboost。

機器學習中,你可能會遇到幾個不同的問題。而具體的策略取決于你面臨的問題:

1. 大型模型:數據適合放入RAM,但是訓練時間太長。許多超參數組合,許多模型的大型集合等。

1. 大型數據集:數據大于RAM,并且不能選擇抽樣。

因此,你應該:

· 對于內存中適合的問題,只需使用scikit-learn(或你最喜歡的ML庫)即可;

· 對于大型模型,請使用dask_ml.joblib和你最喜歡的scikit-learn估算器

· 對于大型數據集,請使用dask_ml估算器。

a)預處理:

dask_ml.preprocessing包含一些sklearn的一些功能,如RobustScalar(穩健標量),StandardScalar(標準標量),LabelEncoder(標簽編碼器),OneHotEncoder(獨熱編碼),PolynomialFeatures(多項式特性)等等,以及它的一些自己的如Categorizer(分類器),DummyEncoder(虛擬編碼),OrdinalEncoder(序數編碼器)等。

你可以像使用PandasDataFrame一樣使用它們。

from dask_ml.preprocessing import RobustScalardf = da.read_csv("BigFile.csv", chunks=50000)rsc = RobustScalardf["column"] = rsc.fit_transform(df["column"])

你可以使用Dask的DataFrame上的預處理方法,從Sklearn的Make_pipeline方法生成一個管道。

b)超參數搜索:

Dask具有sklearn用于進行超參數搜索的方法,例如GridSearchCV,RandomizedSearchCV等等。

from dask_ml.datasets import make_regressionfrom dask_ml.model_selection import train_test_split, GridSearchCVX, y = make_regression(chunks=50000)xtr, ytr, xval, yval = test_train_split(X, y)gsearch = GridSearchCV(estimator, param_grid, cv=10)gsearch.fit(xtr, ytr)

而且,如果要partial_fit與估算器一起使用,則可以使用dask-ml的IncrementalSearchCV。

注意:(來自Dask)如果要使用后擬合任務(如評分和預測),則使用基礎估計量評分方法。如果你的估算器(可能來自sklearn )無法處理大型數據集,則將估算器包裝在" dask_ml.wrappers.ParallelPostFit" 周圍。它可以并行化" predict"," predict_proba"," transform"等方法。

c)模型/估計器:

Dask具有一些線性模型(的LinearRegression,LogisticRegression等),一些聚類模型(Kmeans和SpectralClustering),一種使用Tensorflow 的方法,使用Dask訓練XGBoost模型的方法。

如果訓練數據較小,則可以將sklearn的模型與結合使用Dask,如果ParallelPostFit包裝數據較大,則可以與包裝器一起使用(如果測試數據較大)。

from sklearn.linear_model import ElasticNetfrom dask_ml.wrappers import ParallelPostFitel = ParallelPostFit(estimator=ElasticNet)el.fit(Xtrain, ytrain)preds = el.predict(Xtest)

如果數據集不大但模型很大,則可以使用joblib。sklearns編寫了許多用于并行執行的算法(你可能使用過n_jobs=-1參數),joblib該算法利用線程和進程來并行化工作負載。要用于Dask并行化,你可以創建一個Client(客戶端)(必須),然后使用with joblib.parallel_backend('dask'):包裝代碼。

import dask_ml.joblibfrom sklearn.externals import joblibclient = Clientwith joblib.parallel_backend('dask'): # 你的 scikit-learn 代碼

注意:DASK JOBLIB后端對于擴展CPU綁定的工作負載非常有用; 在RAM中包含數據集的工作負載,但具有許多可以并行完成的單獨操作。要擴展到受RAM約束的工作負載(大于內存的數據集),你應該使用Dask的內置模型和方法。

而且,如果你訓練的數據太大而無法容納到內存中,那么你應該使用Dask的內置估算器來加快速度。你也可以使用Dask的wrapper.Incremental它使用基礎估算器的partial_fit方法對整個數據集進行訓練,但實際上是連續的。

Dask的內置估計器很好地擴展用于大型數據集與多種優化算法,如admm,lbfgs,gradient_descent等,并且正則化器如 L1,L2,ElasticNet等。

from dask_ml.linear_model import LogisticRegressionlr = LogisticRegressionlr.fit(X, y, solver="lbfgs")

經過4期的內容講解,你學會加快Python算法的四種方法了么?

數據分析咨詢請掃描二維碼

若不方便掃碼,搜微信號:CDAshujufenxi

數據分析師資訊
更多

OK
客服在線
立即咨詢
日韩人妻系列无码专区视频,先锋高清无码,无码免费视欧非,国精产品一区一区三区无码
客服在線
立即咨詢