Python 两种并发机制
Threading
threading(线程编程) 是一种通过在程序中创建多个执行单元(线程),来实现并发执行的技术。线程共享同一个进程的内存空间,并且可以并行处理任务,因此能够更好地利用多核CPU资源。
线程 是进程中的一个轻量级执行单元,一个进程可以包含多个线程。这些线程共享相同的内存和资源(如文件句柄),但每个线程有自己的栈和指令指针。由于共享内存,线程间通信非常快速,但也更容易出现竞争条件、死锁等同步问题。
threading经典应用场景
I/O操作 :在等待I/O操作完成(如文件读写、网络请求)时,其他线程可以继续执行任务,提高程序响应速度。
并发任务 :例如图形界面的响应式编程、并行数据处理。
An Example :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import threadingimport requestsdef fetch_data (url ): response = request.get(url) print (f'{response.status_code} ' ) urls = [url1, url2……] threads = [] for url in urls: t = threading.Thread(target=fetch_data, args=(url,)) threads.append(t) t.start() for t in threads: t.join()
多进程虽然能够处理并行任务,但每个进程的上下文切换会有额外的开销,这会影响程序的性能。因此,此类简单的请求工作更加适合使用多线程。
optimize :
concurrent 是 Python 标准库中的一个模块,专门用于支持 并发编程 。它提供了高层次的接口来管理多个任务的并发执行,主要包括线程和进程池的管理。
如果存在大量需要访问的url,可以使用concurrent.futures进行线程池创建。通过线程池,可以控制同时运行的线程数,避免一次性创建过多的线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import concurrent.futuresimport requestsdef fetch_data (url ): try : response = requests.get(url) print (f"Finished download from: {url} , Status Code: {response.status_code} " ) except requests.RequestException as e: print (f"Error downloading {url} : {e} " ) urls = ["http://example.com" for _ in range (1000 )] with concurrent.futures.ThreadPoolExecutor(max_workers=20 ) as executor: futures = [executor.submit(fetch_data, url) for url in urls] for future in concurrent.futures.as_completed(futures): future.result() print ("All downloads complete." )
Multiprocessing
multiprocessing(多进程编程) 是通过在程序中创建多个独立的进程,每个进程都有自己的内存空间、资源和执行单元,以实现并行处理。多进程能够避免线程竞争带来的问题,因为各个进程之间是独立的。
进程 是操作系统中资源分配的基本单位。每个进程都有自己的地址空间、文件句柄、栈、堆等资源。不同于线程,进程之间不共享内存,这意味着它们更加安全,但也需要更复杂的通信方式(如管道、消息队列)来进行数据交换。
multiprocessing的典型应用场景:
高可靠性任务 :当某些任务必须完全隔离,不能因为其他任务的崩溃而被干扰时(如服务器进程、数据库进程等)。
CPU密集型任务 :需要大量的计算资源时(如图像处理、科学计算),使用多进程可以充分利用多核CPU的性能。
关于multiprocessing的基础应用,在之前的文章《python并行运算》中做过大致介绍。
An Example
并行矩阵运算:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import multiprocessing as mpimport numpy as npdef matrix_square (matrix ): return np.dot(matrix, matrix) if __name__ == "__main__" : matrices = [np.random.rand(1000 , 1000 ) for _ in range (4 )] with mp.Pool(processes=4 ) as pool: results = pool.map (matrix_square, matrices) print ("Matrix calculations completed." )
并行蒙特卡洛模拟:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import multiprocessing as mpimport randomdef monte_carlo_pi (num_samples ): inside_circle = 0 for _ in range (num_samples): x, y = random.random(), random.random() if x**2 + y**2 <= 1 : inside_circle += 1 return inside_circle / num_samples * 4 if __name__ == "__main__" : num_samples = 1000000 num_processes = 4 with mp.Pool(processes=num_processes) as pool: pi_estimates = pool.map (monte_carlo_pi, [num_samples] * num_processes) pi_average = sum (pi_estimates) / num_processes print (f"Estimated Pi: {pi_average} " )
optimize :
concurrent.futures的ProcessPoolExecutor ,类似于multiprocessing.Pool
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import concurrent.futuresimport randomdef monte_carlo_pi (num_samples ): inside_circle = 0 for _ in range (num_samples): x, y = random.random(), random.random() if x**2 + y**2 <= 1 : inside_circle += 1 return inside_circle / num_samples * 4 if __name__ == "__main__" : num_samples = 1000000 num_processes = 4 with concurrent.futures.ProcessPoolExecutor(max_workers=num_processes) as executor: futures = [executor.submit(monte_carlo_pi, num_samples) for _ in range (num_processes)] pi_estimates = [f.result() for f in concurrent.futures.as_completed(futures)] pi_average = sum (pi_estimates) / num_processes print (f"Estimated Pi: {pi_average} " )
concurrent.features优势
统一接口 :无论是使用 ThreadPoolExecutor 还是 ProcessPoolExecutor,它们的 API 是统一的,可以很方便地根据任务类型选择是使用线程还是进程。
简化代码 :相比 multiprocessing.Pool,concurrent.futures 提供了更高层的接口,如 submit() 和 as_completed(),减少了复杂度。
Future 对象 :submit() 方法返回 Future 对象,允许在任务完成后方便地获取结果、检查任务状态或处理异常。
自动资源管理 :concurrent.futures 支持上下文管理(with 语句),能够自动管理进程或线程池的生命周期,避免资源泄漏。