0%

threading or multiprocessing?

Python 两种并发机制

Threading

threading(线程编程) 是一种通过在程序中创建多个执行单元(线程),来实现并发执行的技术。线程共享同一个进程的内存空间,并且可以并行处理任务,因此能够更好地利用多核CPU资源。

线程是进程中的一个轻量级执行单元,一个进程可以包含多个线程。这些线程共享相同的内存和资源(如文件句柄),但每个线程有自己的栈和指令指针。由于共享内存,线程间通信非常快速,但也更容易出现竞争条件、死锁等同步问题。

threading经典应用场景

  • I/O操作:在等待I/O操作完成(如文件读写、网络请求)时,其他线程可以继续执行任务,提高程序响应速度。
  • 并发任务:例如图形界面的响应式编程、并行数据处理。

An Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import threading
import requests

def fetch_data(url):
response = request.get(url)
print(f'{response.status_code}')

urls = [url1, url2……]

threads = []
for url in urls:
t = threading.Thread(target=fetch_data, args=(url,))
threads.append(t)
t.start()

for t in threads:
t.join()

多进程虽然能够处理并行任务,但每个进程的上下文切换会有额外的开销,这会影响程序的性能。因此,此类简单的请求工作更加适合使用多线程。

optimize
concurrent 是 Python 标准库中的一个模块,专门用于支持 并发编程。它提供了高层次的接口来管理多个任务的并发执行,主要包括线程和进程池的管理。
如果存在大量需要访问的url,可以使用concurrent.futures进行线程池创建。通过线程池,可以控制同时运行的线程数,避免一次性创建过多的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import concurrent.futures
import requests

# 任务函数:访问URL
def fetch_data(url):
try:
response = requests.get(url)
print(f"Finished download from: {url}, Status Code: {response.status_code}")
except requests.RequestException as e:
print(f"Error downloading {url}: {e}")

# URL列表(假设有1000个)
urls = ["http://example.com" for _ in range(1000)]

# 使用ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
# 提交所有任务
futures = [executor.submit(fetch_data, url) for url in urls]

# 等待任务完成
for future in concurrent.futures.as_completed(futures):
future.result() # 可以获取返回结果,若不关心结果,可以省略这行

print("All downloads complete.")

Multiprocessing

multiprocessing(多进程编程) 是通过在程序中创建多个独立的进程,每个进程都有自己的内存空间、资源和执行单元,以实现并行处理。多进程能够避免线程竞争带来的问题,因为各个进程之间是独立的。

进程是操作系统中资源分配的基本单位。每个进程都有自己的地址空间、文件句柄、栈、堆等资源。不同于线程,进程之间不共享内存,这意味着它们更加安全,但也需要更复杂的通信方式(如管道、消息队列)来进行数据交换。

multiprocessing的典型应用场景:

  • 高可靠性任务:当某些任务必须完全隔离,不能因为其他任务的崩溃而被干扰时(如服务器进程、数据库进程等)。
  • CPU密集型任务:需要大量的计算资源时(如图像处理、科学计算),使用多进程可以充分利用多核CPU的性能。

关于multiprocessing的基础应用,在之前的文章《python并行运算》中做过大致介绍。
An Example
并行矩阵运算:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import multiprocessing as mp
import numpy as np

# 定义矩阵计算任务
def matrix_square(matrix):
return np.dot(matrix, matrix)

if __name__ == "__main__":
# 创建一个随机矩阵
matrices = [np.random.rand(1000, 1000) for _ in range(4)]

# 使用进程池进行并行计算
with mp.Pool(processes=4) as pool:
results = pool.map(matrix_square, matrices)

print("Matrix calculations completed.")

并行蒙特卡洛模拟:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import multiprocessing as mp
import random

# 定义蒙特卡罗模拟任务
def monte_carlo_pi(num_samples):
inside_circle = 0
for _ in range(num_samples):
x, y = random.random(), random.random()
if x**2 + y**2 <= 1:
inside_circle += 1
return inside_circle / num_samples * 4 # 估算π值

if __name__ == "__main__":
# 每个进程处理100万次采样
num_samples = 1000000
num_processes = 4

# 使用进程池并行运行蒙特卡罗模拟
with mp.Pool(processes=num_processes) as pool:
pi_estimates = pool.map(monte_carlo_pi, [num_samples] * num_processes)

# 计算平均值
pi_average = sum(pi_estimates) / num_processes
print(f"Estimated Pi: {pi_average}")

optimize
concurrent.futures的ProcessPoolExecutor ,类似于multiprocessing.Pool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import concurrent.futures
import random

# 定义蒙特卡罗模拟任务
def monte_carlo_pi(num_samples):
inside_circle = 0
for _ in range(num_samples):
x, y = random.random(), random.random()
if x**2 + y**2 <= 1:
inside_circle += 1
return inside_circle / num_samples * 4 # 估算π值

if __name__ == "__main__":
num_samples = 1000000 # 每个进程采样的数量
num_processes = 4 # 进程数

# 使用 ProcessPoolExecutor 并行执行任务
with concurrent.futures.ProcessPoolExecutor(max_workers=num_processes) as executor:
futures = [executor.submit(monte_carlo_pi, num_samples) for _ in range(num_processes)]
pi_estimates = [f.result() for f in concurrent.futures.as_completed(futures)]

# 计算平均估算值
pi_average = sum(pi_estimates) / num_processes
print(f"Estimated Pi: {pi_average}")

concurrent.features优势

  1. 统一接口:无论是使用 ThreadPoolExecutor 还是 ProcessPoolExecutor,它们的 API 是统一的,可以很方便地根据任务类型选择是使用线程还是进程。

  2. 简化代码:相比 multiprocessing.Pool,concurrent.futures 提供了更高层的接口,如 submit() 和 as_completed(),减少了复杂度。

  3. Future 对象:submit() 方法返回 Future 对象,允许在任务完成后方便地获取结果、检查任务状态或处理异常。

  4. 自动资源管理:concurrent.futures 支持上下文管理(with 语句),能够自动管理进程或线程池的生命周期,避免资源泄漏。