0%

python并行运算

使用multiprocessing函数在python中实现并行运算。

python中自带的 multiprocessing 包同时提供了本地和远程并发操作,允许程序员充分利用设备上的多个处理器。本文主要介绍之中的Pool对象,它提供了一种快捷的方法,赋予函数并行化处理一系列输入值的能力,可以将输入数据分配给不同进程处理(数据并行)。在官方文档中的例子是:

1
2
3
4
5
6
7
8
from multiprocessing import Pool

def f(x):
return x*x

if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))

本文将简单演示并行运算在处理数据中带来的性能提升。我将使用Pool函数实现并行遍历一个包含一百万条数据的csv文件并进行一些简单的运算,以对比我的设备在不同的进程数量下处理数据的速度。
代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import time  
from multiprocessing import Pool
import pandas as pd


def read_csv_chunk(file_chunk):
df0 = pd.read_csv(file_chunk)
return df0


# 简单读取运算一下数据
def f(df0):
result_sum = 0
for index, row in df0.iterrows():
result_sum += row[5] * row[0]
return result_sum


# 并行运算
def deal_csv_parallel(df0, times):
result_sum = 0
if times == 0:
for index, row in df0.iterrows():
result_sum += row[5] * row[0]

return result_sum

if times != 0:
chunk_size = len(df0) // times
chunks = [df[i:i + chunk_size] for i in range(0, len(df), chunk_size)]
with Pool(times) as p:
results = p.map(f, chunks)
return sum(results)


if __name__ == "__main__":

# 文件路径
file_path = 'output.csv'

df = read_csv_chunk(file_path)

Pool_size = [i for i in range(26)]
run_time = []
run_time_multiple = []

for i in Pool_size:
start_time = time.time()

deal_csv_parallel(df, i)

end_time = time.time()

t = end_time - start_time

run_time.append(t)

for item in run_time:
run_time_multiple.append(f"{run_time[0]/item*100}%")

data = {'并行数': Pool_size, '运行时间': run_time, '效率比': run_time_multiple}

print(df_out)

其中,chunks = [df[i:i + chunk_size] for i in range(0, len(df), chunk_size)]负责将表格拆分成和进程数相等的几个部分,随后通用:
1
2
with Pool(times) as p:  
results = p.map(f, chunks)

进行并行运算。代码运行的结果为:

进程数 运行时间 效率比
0 30.298478 100.0%
1 30.566270 99.12389630201749%
2 15.768265 192.14846418175512%
3 10.124307 299.26471386481404%
4 7.879102 384.5422643570513%
5 7.495068 404.24552340989857%
6 6.015692 503.65742377427705%
7 5.712452 530.3935987002207%
8 5.440717 556.8838978332274%
9 5.770803 525.030563530312%
10 5.318123 569.7212889333297%
11 5.124458 591.2523645483335%
12 5.040725 601.0737603013101%
13 5.147555 588.5993925729343%
14 4.993004 606.8185840078334%
15 5.116237 592.2023776071396%
16 5.017882 603.8101404027773%
17 5.150956 588.2107286306572%
18 5.068639 597.7636351261095%
19 4.979399 608.4766411595016%
20 5.103571 593.6720989778112%
21 5.076162 596.8777272425182%
22 5.401071 560.9716614488367%
23 5.360952 565.1697690343107%
24 5.318242 569.7085696153763%
25 5.748274 527.088308186721%

可以看到,在进程数大于对于4后程序运行速度下降开始收敛,并维持在一定水平。速度最低点较非并行运算节约了5/6的时间,可见并行预算对这种背景下的数据处理带来的收益之大。