说明
《Python 教程》 持续更新中,提供建议、纠错、催更等加作者微信: gairuo123(备注:pandas教程)和关注公众号「盖若」ID: gairuo。跟作者学习,请进入 Python学习课程。欢迎关注作者出版的书籍:《深入浅出Pandas》 和 《Python之光》。
Python 中的 GIL(Global Interpreter Lock,全局解释器锁,见前文介绍)的存在,也就是多线程的时候,同一时间可能只能有一个线程在 CPU 上运行,而且是单个 CPU 上运行,不管你的 CPU 有多少核数。如果想要充分地使用多核 CPU 的资源,在 Python 中大部分情况需要使用多进程。可以访问 Python 多任务 了解背景知识。
一个程序至少有一个进程,一个进程至少有一个线程(必须依存在进程中)。每个进程都有一个唯一 pid (进程 ID),一旦 kill 掉,程序就关闭执行了。
Python 的 os 模块封装了常见的系统调用,通过以下方式可获取到当前的进程号 pid(每次执行随机生成):
import os
pid = os.getpid() # 获取当前进程的进程ID
ppid = os.getppid() # 返回父进程的ID
print(f'{pid=},{ppid=}')
# pid=95478,ppid=92960
用命令行查看处理 Python 的进程:
ps -ef # 查看电脑所有进程
ps |grep python # 查看所有 Python 进程
kill -9 pid 91779 # 根据 pid 进程编号,终止进程
# 运行中系统的进程动态实时视图 按q退出
top
Python 多进程的特点:
Python 创建进程,可以归纳为三种:fork,multiprocessing 以及进程池 Pool。
内置 os 模块 fork() 方法可以创建子进程。调用 fork() 将返回两个 pid。操作系统自动把当前进程(称为父进程,不为 0 的值,这是其子进程ID)复制了一份(称为子进程,返回 0),子进程会复制父进程的数据信息,而后程序就分两个进程继续运行后面的程序。
import os
# 注意,fork函数,只在Unix/Linux/Mac上运行,windows不可以
fork = os.fork() # 复制一个子进行,返回两次
pid = os.getpid() # 获取当前进程的进程ID
ppid = os.getppid() # 返回父进程的ID
print(f'{fork=}')
print(f'当前: {pid=}')
print(f'父进程: {ppid=}', end='\n'*2)
'''
fork=94162
当前: pid=94161
父进程: ppid=92960
fork=0
当前: pid=94162
父进程: ppid=94161
'''
一个父进程可以 fork 出很多子进程,父进程要记下每个子进程的ID,而子进程只需要调用 getppid() 就可以拿到父进程的 ID。另外,要注意的是父子进程的执行先后顺序完全取决于操作系统的调度算法,是不确定的。在父子进程中可以继续 fork。
通过上边的原理,我们可以将一个任务分配两个非常大的计算任务由两个进程来分别来做。
import os
fork = os.fork() # 复制一个子进行
if fork == 0: # 子进程执行
pid = os.getpid() # 获取当前进程的进程ID
print(f'{pid=}开始执行...')
n1 = sum(range(123**4))
print(f'{pid=}执行完成, 结果为{n1=}')
else: # 父进程执行
pid = os.getpid() # 获取当前进程的进程ID
print(f'{pid=}开始执行...')
n2 = sum(range(456**3))
print(f'{pid=}执行完成, 结果为{n2=}')
'''
pid=95373开始执行...
pid=95374开始执行...
pid=95373执行完成, 结果为n2=4495303886411520
pid=95374执行完成, 结果为n1=26194547099688120
'''
# 期间在终端执行命令,可以看到进程信息
'''
(py310) % ps |grep python
94992 ttys000 0:00.00 grep python
95373 ttys001 0:53.55 .../test.py
95374 ttys001 0:53.50 .../test.py
'''
由于 fork() 是 Linux 上的概念,所以如果要跨平台,最好还是使用 subprocess 模块来创建子进程。
想要结束进程也可以按 ^C
(ctr+c)来结束。
编写多进程的服务程序,Unix/Linux(mac) 是最好的选择,由于 Windows 没有 fork 调用, Python 为了施展期是跨平台我,内置的 multiprocessing 模块就提供了非常方便而又通用的操作。
multiprocessing 模块提供了一个 Process 类来抽象一个进程对象:
from multiprocessing import Process
import os
# 进程要执行的代码
def task_func(num):
pid = os.getpid() # 获取当前进程的进程ID
print(f'{pid=}开始执行({num=})...')
n = sum(range(num))
print(f'{pid=}执行完成, 结果为{n=}')
if __name__=='__main__':
nums = [123**4, 456**3, 234*2, 567**2]
pid = os.getpid() # 获取当前进程的进程IDn
print(f'{pid=}开始执行...')
procs = []
# 用参数实例化
for n in nums:
p = Process(target=task_func, args=(n,))
procs.append(p)
p.start()
# 完成执行
for i in procs:
i.join()
print('全部完成!')
'''
pid=96283开始执行...
pid=96286开始执行(num=94818816)...
pid=96287开始执行(num=468)...
pid=96287执行完成, 结果为n=109278
pid=96285开始执行(num=228886641)...
pid=96288开始执行(num=321489)...
pid=96288执行完成, 结果为n=51677427816
pid=96286执行完成, 结果为n=4495303886411520
pid=96285执行完成, 结果为n=26194547099688120
全部完成!
'''
如果我们创建一个 process 对象,通过 start() 告诉它开始处理,然后,进程将运行并返回其结果。之后,我们通过 join() 函数告诉流程完成。
利用多线程时,一般都先让子线程调用 start() ,然后再去调用 join(),意为等待进程结束,让主进程等待子进程结束才继续走后续的逻辑,如果不调用的话上例中「全部完成!」显示逻辑将得不到执行。
总结:
注意,这种方法下,if __name__ == '__main__'
是必须的。
前两种方法需要我们手动指定进程的数量,进程执行完释放后如果高效让下个进程再利用,便成了问题。
如果处理的任务非常多,手动的去创建进程的工作量巨大,此时就可以用到 multiprocessing 模块提供的Pool方法。
from multiprocessing import Pool
import os
# 进程要执行的代码
def task_func(num):
pid = os.getpid() # 获取当前进程的进程ID
print(f'{pid=}开始执行({num=})...')
n = sum(range(num))
print(f'{pid=}执行完成, 结果为{n=}')
if __name__=='__main__':
pid = os.getpid() # 获取当前进程的进程IDn
print(f'{pid=}开始执行...')
cpu_cnt = os.cpu_count()
print(f'{cpu_cnt=}')
p = Pool(cpu_cnt) # 创建一个进程池, 为 CPU 数量
for i in range(99):
# 异步非阻塞方式,还可用 p.apply/p.map 等
p.apply_async(task_func, args=(i,))
p.close() # 关闭进程池,不能继续在进程池中添加进程
p.join() # 等待所有子进程执行完毕,再执行下面的代码
print('全部完成!')
'''
pid=97731开始执行...
cpu_cnt=8
pid=97734开始执行(num=0)...
pid=97734执行完成, 结果为n=0
pid=97734开始执行(num=1)...
pid=97734执行完成, 结果为n=0
pid=97734开始执行(num=2)...
pid=97734执行完成, 结果为n=1
...
pid=97734开始执行(num=98)...
pid=97734执行完成, 结果为n=4753
全部完成!
'''
以上共创建有 8 个进程的进程池,将满了的时候就等其中执行完的释放再把新任务添加进去。Pool 的默认大小是 CPU 的核数,可以根据自己的需要指定进程数量。
multiprocessing.dummy
是 multiprocessing 的一个子库,是以相同 API 实现的多线程模块。multiprocess.dummy
的接口与 multiprocess 的接口相同,使得很方便在多进程与多线程间切换(特别是在一开始你不知道程序到底CPU密集型还是IO密集型的时候)。
# 多进程 场景CPU密集程序
from multiprocessing import Pool
# 多线程 场景IO密集程序,一个进程
from multiprocessing.dummy import Pool
在不修改程序主体代码的情况下,两种方式都跑一下,哪个速度快用哪个就行了。
Python3.2带来了 concurrent.futures 模块,这个模块具有线程池和进程池、管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能。
concurrent.futures 会以子进程的形式,平行的运行多个 Python 解释器,从而令 Python 程序可以利用多核CPU来提升执行速度。由于子进程与主解释器相分离,所以他们的全局解释器锁也是相互独立的。每个子进程都能够完整的使用一个CPU内核。
# 可轻松切换为线程池
# from concurrent.futures import ThreadPoolExecutor as Pool
from concurrent.futures import ProcessPoolExecutor as Pool
import os
import time
# 进程要执行的代码
def task_func(num):
pid = os.getpid() # 获取当前进程的进程ID
n = sum(range(num))
print(f'{pid=} 执行完成, 结果为{n=}')
return n
if __name__=='__main__':
pid = os.getpid() # 获取当前进程的进程IDn
print(f'{pid=}开始执行...')
g = 0 # 全局变量,初始值
start = time.time()
with Pool(max_workers=5) as executor:
# 还有 executor.map()
for i in range(9999):
e = executor.submit(task_func, i)
g += e.result() # 得到返回结果进行计算
print(f'全部完成!{g=}, 用时 {time.time()-start}')
'''
pid=19640开始执行...
pid=19643 执行完成, 结果为n=0
pid=19643 执行完成, 结果为n=1
pid=19643 执行完成, 结果为n=3
...
pid=29713 执行完成, 结果为n=49965006
pid=29713 执行完成, 结果为n=49975003
全部完成!g=166566684999, 用时 6.832391977310181
'''
可轻松切换为线程池 ThreadPoolExecutor。
subprocess 模块允许你生成新的进程,连接它们的输入、输出、错误管道,并且获取它们的返回码。我们通过标准库中的 subprocess 包来 fork 一个子进程,并且运行一个外部的程序。subprocess 包中定义有数个创建子进程的函数,这些函数分别以不同的方式创建子进程,所欲我们可以根据需要来从中选取一个使用。另外 subprocess 还提供了一些管理标准流(standard stream)和管道(pipe)的工具,从而在进程间使用文本通信。
通俗地说就是通过这个模块,你可以在 Python 的代码里执行操作系统级别的命令,比如“ipconfig”、“du -sh”等等。subprocess模块替代了一些老的模块和函数。
import subprocess
c = subprocess.run('ls', shell=True)
print(f'{c=}')
print(f'{c.returncode=}')
print(f'{c.args=}')
print(f'{c.stdout=}')
'''
Applications Documents Library ...
Desktop Downloads Movies ...
c=CompletedProcess(args='ls', returncode=0)
c.returncode=0
c.args='ls'
c.stdout=None
'''
大多数情况下,推荐使用 run() 方法调用子进程,执行操作系统命令。在更高级的使用场景,你还可以使用Popen接口。对于更进阶的用例,也可以使用底层的 Popen 接口。 run() 函数是在 Python 3.5 被添加的。
在上边的多进程案例中,各个进程的执行是独立的,各自返回了结果,它们之间没有交互。但实际业务中,我们将一个大的事项拆分为多个事件并行,过程和结果可能是有数据依赖和数据交互的。这就涉及进程间的通信问题。
Python 多线程之间共享变量很简单,直接定义全局 global 变量即可。主进程与子进程是并发执行的,进程之间默认是不能共享全局变量的(子进程不能改变主进程中全局变量的值)。如果要共享全局变量需要用:
multiprocessing.Value("d",10.0)
,数值multiprocessing.Array("i",[1,2,3,4,5])
,数组multiprocessing.Manager().dict()
,字典multiprocessing.Manager().list(range(5))
,列表进程通信(进程之间传递数据)用:
multiprocessing.Queue()
,单向通信multiprocessing.Pipe()
,双向通信例如,我们将上文案例中的各进程计算结果再加起来:
from multiprocessing import Process, Value
import ctypes
import os
# 进程要执行的代码
def task_func(num, val):
pid = os.getpid() # 获取当前进程的进程ID
n = sum(range(num))
val.value += n
print(f'{pid=}执行完成, 结果为{n=}')
return None
if __name__=='__main__':
nums = [123**4, 456**3, 234*2, 567**2]
pid = os.getpid() # 获取当前进程的进程IDn
print(f'{pid=}开始执行...')
procs = []
# val = Value('l', 0) # 定义一个全局长整型数字
val = Value(ctypes.c_ulong, 0) # 定义一个全局长整型数字
# 用参数实例化
for n in nums:
p = Process(target=task_func, args=(n, val))
procs.append(p)
p.start()
# 完成执行
for i in procs:
i.join()
print(f'全部完成!计算总和为:{val.value}')
'''
pid=8502开始执行...
pid=8506执行完成, 结果为n=109278
pid=8507执行完成, 结果为n=51677427816
pid=8505执行完成, 结果为n=4495303886411520
pid=8504执行完成, 结果为n=26194547099688120
全部完成!计算总和为:30689902663636734
'''
进程池之间共享变量是不能使用上文方式的,因为进程池内进程关系并非父子进程,想要共享,必须使用 Manager 模块来定义。
from multiprocessing import Pool, Manager
import os
# 进程要执行的代码
def task_func(num, g_list):
pid = os.getpid() # 获取当前进程的进程ID
n = sum(range(num))
g_list.append(n)
print(f'{pid=}执行完成, 结果为{n=}')
if __name__=='__main__':
pid = os.getpid() # 获取当前进程的进程IDn
print(f'{pid=}开始执行...')
cpu_cnt = os.cpu_count()
print(f'{cpu_cnt=}')
g_list = Manager().list()
p = Pool(cpu_cnt) # 创建一个进程池, 为 CPU 数量
for i in range(6666):
# 异步非阻塞方式,还可用 p.apply/p.map 等
p.apply_async(task_func, args=(i, g_list))
p.close() # 关闭进程池,不能继续在进程池中添加进程
p.join() # 等待所有子进程执行完毕,再执行下面的代码
print(f'全部完成!, 总结果为{sum(g_list)}。')
'''
...
pid=9356执行完成, 结果为n=22194453
pid=9354执行完成, 结果为n=22201116
pid=9357执行完成, 结果为n=22207780
全部完成!, 总结果为49345687160。
'''
在 Python 中多进程可以使用:
# 复制一个子进行,返回两次
import os
os.fork()
# 进程对象
from multiprocessing import Process
# 进程池
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor as Pool
# 子进程执行系统命令
import subprocess
subprocess.run('ls', shell=True)
三方模块 psutil 中提供了进程管理方法,引用其中的几个方法就能够获得进程的相关信息。
更新时间:April 12, 2022, 10:17 p.m. 标签:python 多进程 进程