Python中的多进程multiprocessing包
multiprocessing包类可以实现跨平台兼容创建子进程,由于使用子进程而非线程有效地绕过了GIL(全局解释器锁),因此,可以充分利多个处理器
在windows中Process()必须放到if __name__ == '__main__'
语句下,否则会抛出异常
由于Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块
如果在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)
使用if __name__ == '__main__'
语句将不会在导入时被调用
multiprocessing包中常用的类
和threading模块使用方式相似的类 栅栏对象: Barrier 信号量对象:Semaphore 有界信号量对象:BoundedSemaphore 条件变量: Condition 事件对象: Event 原始锁对象:Lock 递归锁对象: RLock 创建进程: Process 连接对象:Connection 队列: Queue SimpleQueue JoinableQueue 共享对象: Value Array 管理器对象: managers.BaseManager 进程池对象:Pool 监听器及客户端: multiprocessing.connection模块
常用的和threading模块使用方式相似的类
class multiprocessing.Barrier(parties[, action[, timeout]]):栅栏对象,类似于threading.Barrier
class multiprocessing.Semaphore([value]):信号量对象,类似于threading.Semaphore
一个小小的不同在于,它的acquire方法的第一个参数名是block而非blocking
class multiprocessing.BoundedSemaphore([value]):有界信号量对象,类似threading.BoundedSemaphore
一个小小的不同在于,它的acquire 方法的第一个参数名是block而非blocking
class multiprocessing.Condition([lock]):条件变量,类似于threading.Condition
指定的lock参数应该是multiprocessing模块中的Lock或者RLock对象
class multiprocessing.Event():事件对象,类似于threading.Event
class multiprocessing.Lock():原始锁对象,类似于threading.Lock
一个小小的不同在于,它的acquire 方法的第一个参数名是block而非blocking
class multiprocessing.RLock():递归锁对象: 类似于threading.RLock
一个小小的不同在于,它的acquire 方法的第一个参数名是block而非blocking
multiprocessing包的方法
multiprocessing.active_children():返回当前进程存活的子进程的列表,调用该方法有“等待”已经结束的进程的副作用
multiprocessing.cpu_count():返回系统的CPU数量
该数量不同于当前进程可以使用的CPU数量
可用的CPU数量可以由len(os.sched_getaffinity(0))方法获得
multiprocessing.current_process():返回与当前进程相对应的Process对象
multiprocessing.parent_process():返回父进程Process对象,主进程会返回None
multiprocessing.freeze_support():提供冻结以产生 Windows可执行文件的支持,如PyInstaller
需要在main模块的if __name__ == '__main__'该行之后马上调用该函数
def f():
print('hello world!')
if __name__ == '__main__':
multiprocessing.freeze_support()
multiprocessing.Process(target=f).start()
如果没有调用freeze_support(),在尝试运行被冻结的可执行文件时会抛出RuntimeError异常
对freeze_support()的调用在非 Windows 平台上是无效的
multiprocessing.get_all_start_methods()
返回支持的启动方法的列表,该列表的首项即为默认选项
可能的启动方法有 'fork' , 'spawn' 和 'forkserver'
在Windows中只有'spawn'是可用的
Unix平台总是支持 'fork' 和 'spawn' ,且'fork'是默认值
multiprocessing.get_context(method=None)
返回一个Context对象
该对象具有和 multiprocessing 模块相同的API
method指定启动方法,应该是'fork', 'spawn', 'forkserver',如果指定的启动方法不存在,抛出ValueError异常
如果method为None将返回默认上下文对象
multiprocessing.get_start_method(allow_none=False):返回启动进程时使用的启动方法名
如果启动方法已经固定,并且allow_none为False,返回默认的启动方法名
如果启动方法没有设定,并且allow_none为True ,将返回None
'fork' 是 Unix 的默认值,'spawn'是Windows 和macOS的默认值。
multiprocessing.set_executable(executable)
设置在启动子进程时使用的Python解释器路径
默认使用sys.executable
如嵌入式编程人员可能这样做
set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
创建进程对象
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
Process类拥有和threading.Thread等价的大部分方法
应始终使用关键字参数调用构造函数
group应该始终是None,它仅用于兼容threading.Thread
target是由run()方法调用的可调用对象
name是进程名称
args是目标调用的参数元组
kwargs是目标调用的关键字参数字典
daemon是否将进程设置为守护进程,默认继承自创建进程
当父进程退出时,会尝试终止其所有子进程中的守护进程
不允许在守护进程中创建子进程,因为当守护进程由于父进程退出而中断时,其子进程会变成孤儿进程
如果子类重写该构造函数,必须确保它在对进程执行任何其他操作之前调用基类构造函数(Process.__init__() )
进程对象的属性
name:进程的名称,没有语义,仅用于识别目的。可以为多个进程指定相同的名称
默认为一个形式为'Process-N1:N2:...:Nk'的名称,其中每个Nk是第N个子进程
daemon:该进程是否是一个守护进程,必须在进程start()被调用之前设置
pid:返回进程ID,在生成该进程之前,该值为None
exitcode:进程的退出代码。如果尚未终止则为None
authkey:进程的身份验证密钥(字节字符串)
当multiprocessing初始化时,主进程使用os.urandom()分配一个随机字符串
当创建Process对象时,将继承其父进程的身份验证密钥
可以通过将authkey设置为另一个字节字符串来更改
sentinel:系统对象的数字句柄,当进程结束时将变为"ready"
当使用multiprocessing.connection.wait()一次等待多个事件时可以使用此值。否则调用join()更简单
进程对象的方法
run():在当前进程中运行指定函数
start():创建新的进程,然后运行指定函数
相当于创建一个新进程,然后在新进程中运行run()方法
join([timeout]):阻塞主进程指定的秒数
不指定则阻塞至调用该方法的进程终止
一个进程可以被 join 多次
进程无法join自身,因为这会导致死锁
不要尝试在启动进程之前join进程
is_alive():返回进程是否存活
terminate():终止进程。在Unix上使用SIGTERM信号完成;在Windows上使用TerminateProcess()
不会执行退出处理程序和finally子句等
进程的后代进程不会被终止,它们将简单地变成孤立的
如果在关联进程使用管道或队列时使用此方法,则管道或队列可能会损坏,并可能无法被其他进程使用
如果进程已获得锁或信号量等,则终止它可能导致其他进程死锁
kill():与terminate()相同,但在Unix上使用SIGKILL信号
close():关闭进程对象,释放与之关联的所有资源
如果底层进程仍在运行,则会引发ValueError
一旦close()成功返回,进程对象的大多数其他方法和属性将引发ValueError
如果一个进程在尝试使用 Queue 期间被 Process.terminate() 或 os.kill() 调用终止了,那么队列中的数据很可能被破坏。 这可能导致其他进程在尝试使用该队列时发生异常
连接对象
multiprocessing.Pipe(duplex=True)
返回一对Connection对象(conn1, conn2),分别表示管道的两端
duplex为True那么该管道是双向的。duplex为False,那么该管道是单向的,即conn1只能用于接收消息,而conn2仅能用于发送消息
连接对象(Connection):允许收发可以序列化的对象或字符串。它们可以看作面向消息的连接套接字
send(obj):将一个对象发送到连接的另一端,发送的对象必须是可以序列化的,过大的对象(接近32MiB+,这个值取决于操作系统)有可能引发ValueError异常
recv():返回由另一端发送的对象。会一直阻塞直到接收到对象。如果对端关闭了连接或者没有东西可接收,将抛出EOFError异常
fileno():返回由连接对象使用的描述符或者句柄
close():关闭连接对象
poll([timeout]):返回连接对象中是否有可以读取的数据
timeout表示最大阻塞的秒数
未指定timeout会马上返回
timeout为None将一直等待不会超时
使用multiprocessing.connection.wait()可以一次轮询多个连接对象
队列对象
class multiprocessing.Queue([maxsize])
返回一个使用一个管道和少量锁和信号量实现的共享队列实例
当一个进程将一个对象放进队列中时,一个写入线程会启动并将对象从缓冲区写入管道中
qsize():返回队列的大致长度。由于多进程这个数字是不可靠的
这可能会在Unix平台上引起NotImplementedError,因为没有实现sem_getvalue()
empty():返回队列是否是空的。由于多进程该状态是不可靠的
full():返回队列是否是满的。 由于多进程该状态是不可靠的
put(obj, block=True, timeout=None):将 obj 放入队列
block是bool值,表示是否阻塞当前进程,直到有空的缓冲槽或timeout超时
在阻塞了timeout秒之后还是没有可用的缓冲槽时抛出queue.Full异常
block是False时,当没有可用缓冲槽放入对象时抛出queue.Full异常
put_nowait(obj):相当于put(obj, False)
get([block[, timeout]]):从队列中取出并返回对象
block是bool值, 表示是否阻塞当前进程,直到队列中出现可用的对象
在阻塞了timeout秒之后还是没有可用的对象时抛出queue.Empty异常
block是False时,当没有能够取出的可用对象时抛出queue.Empty异常
get_nowait():相当于get(False)
close():指示当前进程将不会再往队列中放入对象。一旦所有缓冲区中的数据被写入管道之后,后台的线程会退出。这个方法在队列被gc回收时会自动调用
join_thread():仅在调用了close()方法之后可用,会阻塞当前进程,直到后台线程退出,确保所有缓冲区中的数据都被写入管道中
如果一个不是队列创建者的进程试图退出,它会尝试等待这个队列的后台线程
这个进程可以使用cancel_join_thread()让join_thread()方法什么都不做直接跳过
cancel_join_thread():防止join_thread()方法阻塞当前进程。防止进程退出时自动等待后台线程退出
这个方法的别名:allow_exit_without_flush()
可能会导致已排入队列的数据丢失,仅适用于当需要当前进程立即退出而不必等待将已排入的队列更新到下层管道,并且不担心丢失数据的时候
class multiprocessing.SimpleQueue
是一个简化的Queue类的实现,很像带锁的Pipe
close():关闭队列:释放内部资源
empty():队列是否为空
put(item):将item放入队列
get():从队列中移出并返回一个对象
class multiprocessing.JoinableQueue([maxsize])
是Queue的子类,额外添加了task_done()和join()方法
task_done():指出之前进入队列的任务已经完成
由队列的消费者进程使用
每次调用get()获取的任务,执行完成后调用task_done()告诉队列该任务已经处理完成
如果被调用的次数多于放入队列中的项目数量,将引发ValueError异常
join():阻塞至队列中所有的元素都被接收和处理完毕
当条目添加到队列的时候,未完成任务的计数就会增加
每当消费者进程调用task_done()未完成计数就会减少
当未完成计数降到零的时候,join()阻塞被解除
共享对象
class multiprocessing.Value(typecode_or_type, *args, lock=True)
返回一个从共享内存上创建的ctypes对象,可以通过value属性访问这个对象本身
typecode_or_type指明了返回的对象类型: 可能是一个ctypes类型或者array
lock表示同步对于此值的访问操作
为True会新建一个递归锁
为Lock或者RLock对象,那么使用这个锁用于同步对这个值的访问操作
为False那么对这个对象的访问将没有锁保护,也就是说这个变量不是进程安全的
诸如+=这类的操作并不具有原子性。所以,可以这样写
with counter.get_lock():
counter.value += 1
class multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
从共享内存中申请并返回一个具有ctypes类型的数组对象
typecode_or_type指明了返回的数组中的元素类型: 它可能是一个ctypes类型或者array
如果size_or_initializer是一个整数,那就会当做数组的长度,并且整个数组的内存会初始化为0
否则,size_or_initializer会被当成一个序列用于初始化数组中的每一个元素,并且会根据元素个数自动判断数组的长度
管理器
管理器提供了一种创建共享数据的方法,从而可以在不同进程中共享,甚至可以通过网络跨机器共享数据。管理器维护一个用于管理 共享对象 的服务。其他进程可以通过代理访问这些共享对象。
multiprocessing.Manager():返回一个已启动的SyncManager管理器对象
这个对象可以用于在不同进程中共享数据
返回的管理器对象对应了一个已经启动的子进程
并且拥有一系列方法可以用于创建共享对象、返回对应的代理
当管理器被垃圾回收或者父进程退出时,管理器进程会立即退出
管理器类定义在multiprocessing.managers模块:
class multiprocessing.managers.BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)
创建一个管理器BaseManager对象
一旦创建,应该及时调用start()或者get_server().serve_forever()以确保管理器对象对应的管理进程已经启动
address是管理器服务进程监听的地址。如果是None则允许和任意主机的请求建立连接
authkey是认证标识,用于检查连接服务进程的请求合法性,相当于连接密码
必须是byte类型的字符串
为None会使用multiprocessing.current_process().authkey创建随机
start([func[, args]]):为管理器开启一个子进程,子进程在启动时会调用func(args)
manager = BaseManager(address=('', 50000), authkey=b'passwd')
manager.start()
get_server():返回一个Server对象,它是管理器在后台控制的真实的服务
Server对象拥有serve_forever()方法,该方法不会开启子进程,会在当前进程中开启
manager = BaseManager(address=('', 50000), authkey=b'passwd')
server = manager.get_server()
server.serve_forever()
connect():将本地管理器对象连接到一个远程管理器进程:
m = BaseManager(address=('162.31.12.14', 50000), authkey=b'abc')
m.connect()
shutdown():停止管理器进程。只能用于已经使用start()启动的进程
进程池对象
class multiprocessing.pool.Pool(processes=None, func=None, args=None, maxtasksperchild=None, context=None)
获取一个进程池对象,它控制可以提交作业的工作进程池。它支持带有超时和回调的异步结果,以及一个并行的 map 实现
processes指定该进程池的最大进程数量,为None则使用os.cpu_count()
每个工作进程将会在启动时调用func(*args)
maxtasksperchild是一个进程在它退出或被新工作进程代替之前能完成的任务数量。默认工作进程寿与池齐
context可被用于指定启动的工作进程的上下文
进程池对象具有需要正确管理内部资源,可以将进程池用作上下文管理器,或者手动调用close()和terminate(),未做此类操作将导致进程在终结阶段挂起
apply(func, args=None, kwds=None):使用args参数以及kwds命名参数调用func, 它会返回结果前阻塞
apply_async(func, args=None, kwds=None, callback=None, error_callback=None)
apply()方法的异步变种,返回一个AsyncResult对象
callback必须是一个接受单个参数的可调用对象
error_callback必须是一个接受单个参数的可调用对象,目标函数执行失败时会将抛出的异常对象作为参数传递
回调函数应该立即执行完成,否则会阻塞负责处理结果的线程
map(func, iterable, chunksize=1)
内置 map() 函数的并行版本
这个方法会将可迭代对象分割为许多块,然后提交给进程池
只支持一个iterable参数,多个可迭代对象使用starmap()
它会保持阻塞直到获得结果
chunksize设置为一个正整数从而(近似)指定每个块的大小
map_async(func, iterable, chunksize=None, callback=None, error_callback=None)
map()方法的异步变种,返回一个AsyncResult对象
starmap(func, iterable, chunksize=1):和map()类似,不过iterable中的每一项会被解包再作为函数参数
starmap_async(func, iterable, chunksize=1, callback=None, error_callback=None)
相当于starmap()与map_async()的结合,迭代iterable的每一项,解包作为 func 的参数并执行
返回一个AsyncResult对象
imap(func, iterable, chunksize=1):map()的延迟执行版本,返回一个迭代器
imap_unordered(func, iterable, chunksize=1):和imap()相同,只不过通过迭代器返回的结果是无序的
close():阻止后续任务提交到进程池,当所有任务执行完成后,工作进程会退出
terminate():不等待未完成的任务,立即停止工作进程
join():等待工作进程结束,调用join()前必须先调用close()或者terminate()
class multiprocessing.pool.AsyncResult()
Pool.apply_async()和Pool.map_async()返回对象所属的类
get(timeout=None):用于获取执行结果
wait(timeout=None):阻塞直到返回结果
ready():返回执行状态,是否已经完成
successful():判断调用是否已经完成并且未引发异常。如果还未获得结果则将引发ValueError
监听器及客户端
通常情况下,进程间通过队列或者Pipe()返回的Connection传递消息
不过,multiprocessing.connection模块其实提供了一些更灵活的特性
multiprocessing.connection.deliver_challenge(connection, authkey)
发送一个随机生成的消息到另一端,并等待回复
如果收到的回复与使用authkey作为键生成的信息摘要匹配成功,就会发送一个欢迎信息给管道另一端。否则抛出AuthenticationError异常
multiprocessing.connection.answer_challenge(connection, authkey)
接收一条信息,使用authkey作为键计算信息摘要,然后将摘要发送回去
如果没有收到欢迎消息,就抛出AuthenticationError异常
multiprocessing.connection.Client(address, family=None, authkey=None)
尝试与address地址上的监听器建立一个连接,返回Connection
family参数设置连接类型,通常可以省略,因为可以通过address的格式推导出来
如果authkey不是None,必须是一个字符串并且会被当做基于 HMAC 认证的密钥,认证失败抛出AuthenticationError异常
如果authkey是None 则不会有认证行为