
Python利用多進程將大量數據放入有限內存的教程
這是一篇有關如何將大量的數據放入有限的內存中的簡略教程。
與客戶工作時,有時會發現他們的數據庫實際上只是一個csv或Excel文件倉庫,你只能將就著用,經常需要在不更新他們的數據倉庫的情況下完成工作。大部分情況下,如果將這些文件存儲在一個簡單的數據庫框架中或許更好,但時間可能不允許。這種方法對時間、機器硬件和所處環境都有要求。
下面介紹一個很好的例子:假設有一堆表格(沒有使用Neo4j、MongoDB或其他類型的數據庫,僅僅使用csvs、tsvs等格式存儲的表格),如果將所有表格組合在一起,得到的數據幀太大,無法放入內存。所以第一個想法是:將其拆分成不同的部分,逐個存儲。這個方案看起來不錯,但處理起來很慢。除非我們使用多核處理器。
目標
這里的目標是從所有職位中(大約1萬個),找出相關的的職位。將這些職位與政府給的職位代碼組合起來。接著將組合的結果與對應的州(行政單位)信息組合起來。然后用通過word2vec生成的屬性信息在我們的客戶的管道中增強已有的屬性。
這個任務要求在短時間內完成,誰也不愿意等待。想象一下,這就像在不使用標準的關系型數據庫的情況下進行多個表的連接。
數據
示例腳本
下面的是一個示例腳本,展示了如何使用multiprocessing來在有限的內存空間中加速操作過程。腳本的第一部分是和特定任務相關的,可以自由跳過。請著重關注第二部分,這里側重的是multiprocessing引擎。
#import the necessary packages
import pandas as pd
import us
import numpy as np
from multiprocessing import Pool,cpu_count,Queue,Manager
# the data in one particular column was number in the form that horrible excel version
# of a number where '12000' is '12,000' with that beautiful useless comma in there.
# did I mention I excel bothers me?
# instead of converting the number right away, we only convert them when we need to
def median_maker(column):
return np.median([int(x.replace(',','')) for x in column])
# dictionary_of_dataframes contains a dataframe with information for each title; e.g title is 'Data Scientist'
# related_title_score_df is the dataframe of information for the title; columns = ['title','score']
### where title is a similar_title and score is how closely the two are related, e.g. 'Data Analyst', 0.871
# code_title_df contains columns ['code','title']
# oes_data_df is a HUGE dataframe with all of the Bureau of Labor Statistics(BLS) data for a given time period (YAY FREE DATA, BOO BAD CENSUS DATA!)
def job_title_location_matcher(title,location):
try:
related_title_score_df = dictionary_of_dataframes[title]
# we limit dataframe1 to only those related_titles that are above
# a previously established threshold
related_title_score_df = related_title_score_df[title_score_df['score']>80]
#we merge the related titles with another table and its codes
codes_relTitles_scores = pd.merge(code_title_df,related_title_score_df)
codes_relTitles_scores = codes_relTitles_scores.drop_duplicates()
# merge the two dataframes by the codes
merged_df = pd.merge(codes_relTitles_scores, oes_data_df)
#limit the BLS data to the state we want
all_merged = merged_df[merged_df['area_title']==str(us.states.lookup(location).name)]
#calculate some summary statistics for the time we want
group_med_emp,group_mean,group_pct10,group_pct25,group_median,group_pct75,group_pct90 = all_merged[['tot_emp','a_mean','a_pct10','a_pct25','a_median','a_pct75','a_pct90']].apply(median_maker)
row = [title,location,group_med_emp,group_mean,group_pct10,group_pct25, group_median, group_pct75, group_pct90]
#convert it all to strings so we can combine them all when writing to file
row_string = [str(x) for x in row]
return row_string
except:
# if it doesnt work for a particular title/state just throw it out, there are enough to make this insignificant
'do nothing'
這里發生了神奇的事情:
#runs the function and puts the answers in the queue
def worker(row, q):
ans = job_title_location_matcher(row[0],row[1])
q.put(ans)
# this writes to the file while there are still things that could be in the queue
# this allows for multiple processes to write to the same file without blocking eachother
def listener(q):
f = open(filename,'wb')
while 1:
m = q.get()
if m =='kill':
break
f.write(','.join(m) + 'n')
f.flush()
f.close()
def main():
#load all your data, then throw out all unnecessary tables/columns
filename = 'skill_TEST_POOL.txt'
#sets up the necessary multiprocessing tasks
manager = Manager()
q = manager.Queue()
pool = Pool(cpu_count() + 2)
watcher = pool.map_async(listener,(q,))
jobs = []
#titles_states is a dataframe of millions of job titles and states they were found in
for i in titles_states.iloc:
job = pool.map_async(worker, (i, q))
jobs.append(job)
for job in jobs:
job.get()
q.put('kill')
pool.close()
pool.join()
if __name__ == "__main__":
main()
由于每個數據幀的大小都不同(總共約有100Gb),所以將所有數據都放入內存是不可能的。通過將最終的數據幀逐行寫入內存,但從來不在內存中存儲完整的數據幀。我們可以完成所有的計算和組合任務。這里的“標準方法”是,我們可以僅僅在“job_title_location_matcher”的末尾編寫一個“write_line”方法,但這樣每次只會處理一個實例。根據我們需要處理的職位/州的數量,這大概需要2天的時間。而通過multiprocessing,只需2個小時。
雖然讀者可能接觸不到本教程處理的任務環境,但通過multiprocessing,可以突破許多計算機硬件的限制。本例的工作環境是c3.8xl ubuntu ec2,硬件為32核60Gb內存(雖然這個內存很大,但還是無法一次性放入所有數據)。這里的關鍵之處是我們在60Gb的內存的機器上有效的處理了約100Gb的數據,同時速度提升了約25倍。通過multiprocessing在多核機器上自動處理大規模的進程,可以有效提高機器的利用率。也許有些讀者已經知道了這個方法,但對于其他人,可以通過multiprocessing能帶來非常大的收益。
數據分析咨詢請掃描二維碼
若不方便掃碼,搜微信號:CDAshujufenxi
2025 年,數據如同數字時代的 DNA,編碼著人類社會的未來圖景,驅動著商業時代的運轉。從全球互聯網用戶每天產生的2.5億TB數據, ...
2025-05-27CDA數據分析師證書考試體系(更新于2025年05月22日)
2025-05-26解碼數據基因:從數字敏感度到邏輯思維 每當看到超市貨架上商品的排列變化,你是否會聯想到背后的銷售數據波動?三年前在零售行 ...
2025-05-23在本文中,我們將探討 AI 為何能夠加速數據分析、如何在每個步驟中實現數據分析自動化以及使用哪些工具。 數據分析中的AI是什么 ...
2025-05-20當數據遇見人生:我的第一個分析項目 記得三年前接手第一個數據分析項目時,我面對Excel里密密麻麻的銷售數據手足無措。那些跳動 ...
2025-05-20在數字化運營的時代,企業每天都在產生海量數據:用戶點擊行為、商品銷售記錄、廣告投放反饋…… 這些數據就像散落的拼圖,而相 ...
2025-05-19在當今數字化營銷時代,小紅書作為國內領先的社交電商平臺,其銷售數據蘊含著巨大的商業價值。通過對小紅書銷售數據的深入分析, ...
2025-05-16Excel作為最常用的數據分析工具,有沒有什么工具可以幫助我們快速地使用excel表格,只要輕松幾步甚至輸入幾項指令就能搞定呢? ...
2025-05-15數據,如同無形的燃料,驅動著現代社會的運轉。從全球互聯網用戶每天產生的2.5億TB數據,到制造業的傳感器、金融交易 ...
2025-05-15大數據是什么_數據分析師培訓 其實,現在的大數據指的并不僅僅是海量數據,更準確而言是對大數據分析的方法。傳統的數 ...
2025-05-14CDA持證人簡介: 萬木,CDA L1持證人,某電商中廠BI工程師 ,5年數據經驗1年BI內訓師,高級數據分析師,擁有豐富的行業經驗。 ...
2025-05-13CDA持證人簡介: 王明月 ,CDA 數據分析師二級持證人,2年數據產品工作經驗,管理學博士在讀。 學習入口:https://edu.cda.cn/g ...
2025-05-12CDA持證人簡介: 楊貞璽 ,CDA一級持證人,鄭州大學情報學碩士研究生,某上市公司數據分析師。 學習入口:https://edu.cda.cn/g ...
2025-05-09CDA持證人簡介 程靖 CDA會員大咖,暢銷書《小白學產品》作者,13年頂級互聯網公司產品經理相關經驗,曾在百度、美團、阿里等 ...
2025-05-07相信很多做數據分析的小伙伴,都接到過一些高階的數據分析需求,實現的過程需要用到一些數據獲取,數據清洗轉換,建模方法等,這 ...
2025-05-06以下的文章內容來源于劉靜老師的專欄,如果您想閱讀專欄《10大業務分析模型突破業務瓶頸》,點擊下方鏈接 https://edu.cda.cn/g ...
2025-04-30CDA持證人簡介: 邱立峰 CDA 數據分析師二級持證人,數字化轉型專家,數據治理專家,高級數據分析師,擁有豐富的行業經驗。 ...
2025-04-29CDA持證人簡介: 程靖 CDA會員大咖,暢銷書《小白學產品》作者,13年頂級互聯網公司產品經理相關經驗,曾在百度,美團,阿里等 ...
2025-04-28CDA持證人簡介: 居瑜 ,CDA一級持證人國企財務經理,13年財務管理運營經驗,在數據分析就業和實踐經驗方面有著豐富的積累和經 ...
2025-04-27數據分析在當今信息時代發揮著重要作用。單因素方差分析(One-Way ANOVA)是一種關鍵的統計方法,用于比較三個或更多獨立樣本組 ...
2025-04-25