使用multiprocessing函数在python中实现并行运算。
python中自带的 multiprocessing
包同时提供了本地和远程并发操作,允许程序员充分利用设备上的多个处理器。本文主要介绍之中的Pool对象,它提供了一种快捷的方法,赋予函数并行化处理一系列输入值的能力,可以将输入数据分配给不同进程处理(数据并行)。在官方文档中的例子是:1
2
3
4
5
6
7
8from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
本文将简单演示并行运算在处理数据中带来的性能提升。我将使用Pool函数实现并行遍历一个包含一百万条数据的csv文件并进行一些简单的运算,以对比我的设备在不同的进程数量下处理数据的速度。
代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63import time
from multiprocessing import Pool
import pandas as pd
def read_csv_chunk(file_chunk):
df0 = pd.read_csv(file_chunk)
return df0
# 简单读取运算一下数据
def f(df0):
result_sum = 0
for index, row in df0.iterrows():
result_sum += row[5] * row[0]
return result_sum
# 并行运算
def deal_csv_parallel(df0, times):
result_sum = 0
if times == 0:
for index, row in df0.iterrows():
result_sum += row[5] * row[0]
return result_sum
if times != 0:
chunk_size = len(df0) // times
chunks = [df[i:i + chunk_size] for i in range(0, len(df), chunk_size)]
with Pool(times) as p:
results = p.map(f, chunks)
return sum(results)
if __name__ == "__main__":
# 文件路径
file_path = 'output.csv'
df = read_csv_chunk(file_path)
Pool_size = [i for i in range(26)]
run_time = []
run_time_multiple = []
for i in Pool_size:
start_time = time.time()
deal_csv_parallel(df, i)
end_time = time.time()
t = end_time - start_time
run_time.append(t)
for item in run_time:
run_time_multiple.append(f"{run_time[0]/item*100}%")
data = {'并行数': Pool_size, '运行时间': run_time, '效率比': run_time_multiple}
print(df_out)
其中,chunks = [df[i:i + chunk_size] for i in range(0, len(df), chunk_size)]
负责将表格拆分成和进程数相等的几个部分,随后通用:1
2with Pool(times) as p:
results = p.map(f, chunks)
进行并行运算。代码运行的结果为:
进程数 | 运行时间 | 效率比 |
---|---|---|
0 | 30.298478 | 100.0% |
1 | 30.566270 | 99.12389630201749% |
2 | 15.768265 | 192.14846418175512% |
3 | 10.124307 | 299.26471386481404% |
4 | 7.879102 | 384.5422643570513% |
5 | 7.495068 | 404.24552340989857% |
6 | 6.015692 | 503.65742377427705% |
7 | 5.712452 | 530.3935987002207% |
8 | 5.440717 | 556.8838978332274% |
9 | 5.770803 | 525.030563530312% |
10 | 5.318123 | 569.7212889333297% |
11 | 5.124458 | 591.2523645483335% |
12 | 5.040725 | 601.0737603013101% |
13 | 5.147555 | 588.5993925729343% |
14 | 4.993004 | 606.8185840078334% |
15 | 5.116237 | 592.2023776071396% |
16 | 5.017882 | 603.8101404027773% |
17 | 5.150956 | 588.2107286306572% |
18 | 5.068639 | 597.7636351261095% |
19 | 4.979399 | 608.4766411595016% |
20 | 5.103571 | 593.6720989778112% |
21 | 5.076162 | 596.8777272425182% |
22 | 5.401071 | 560.9716614488367% |
23 | 5.360952 | 565.1697690343107% |
24 | 5.318242 | 569.7085696153763% |
25 | 5.748274 | 527.088308186721% |
可以看到,在进程数大于对于4后程序运行速度下降开始收敛,并维持在一定水平。速度最低点较非并行运算节约了5/6的时间,可见并行预算对这种背景下的数据处理带来的收益之大。