前言
Python官方文档以前没有认真读,现在仔细读一遍,并且做一下随译(就是说翻译地比较随意 呵呵)。水平有限,翻译可能存在问题,请自行斟酌。
本文对应的官方文档链接:multiprocessing
正文
简介
multiprocessing模块支持用和Treading模块API相似的语法生成进程。该模块支持本地和远程并发,利用subprocesses而不是threads,有效地避开了GIL的限制。该模块允许程序员在机器上充分利用多个CPU,该模块支持Windows和Unix平台。
multiprocessing模块同样提供了Threading模块所没有的,例如它提供的Pool对象,该对象具有一些方法,它可以让多个输入数据并行执行一个函数,实际上是通过将数据分配到每个进程中。下面是一个使用Pool对象实现并行化的简单例子。1
2
3
4
5
6
7
8from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
p = Pool(5)
print(p.map(f, [1, 2, 3]))
输出为1
[1, 4, 9]
Process类
在multiprocessing模块中,可以通过Process对象创建多个进程对象,并且通过start方法来调用它们。Process采用threading.Thread,下面的例子是一个简单的多进程的例子1
2
3
4
5
6
7
8
9from multiprocessing import Process
def f(name):
print 'hello', name
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
想要显示出每个进程的ID,可以参照下面的扩展例子1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19from multiprocessing import Process
import os
def info(title):
print title
print 'module name:', __name__
if hasattr(os, 'getppid'): # only available on Unix
print 'parent process:', os.getppid()
print 'process id:', os.getpid()
def f(name):
info('function f')
print 'hello', name
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
进程间交换对象
multiprocessing支持两种类型的对象作为进程间的交换通道。
Queue
Queue类几乎就是Queue.Queue类的克隆,看下面的例子:1
2
3
4
5
6
7
8
9
10
11from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print q.get() # prints "[42, None, 'hello']"
p.join()
需要注意,Queue既是线程安全的,同时又是进程安全的。
Pipes
pipe方法返回一对连接对象,它们是管道的两端,默认是双向的。下面是个例子1
2
3
4
5
6
7
8
9
10
11
12from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print parent_conn.recv() # prints "[42, None, 'hello']"
p.join()
pipe函数返回的两个连接对象表示管道的两端,每个连接对象都有send()和recv()方法。注意如果管道的两端同时从对方接收或者同时向对方发送数据则会导致数据被破坏。
进程同步
multiprocessing提供了和threading模块等价的同步原语。例如我们可以使用锁来确保某一时刻只有一个进程向终端输出信息。1
2
3
4
5
6
7
8
9
10
11
12from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
print 'hello world', i
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
如果不使用锁这些信息就会混杂在一起。
进程间共享状态
我们在进行并发编程时最好避免分享状态,再使用多进程时尤其要注意。
如果非要这么做,可以参照下面给出的 两种方式
共享内存
可以用Value或者Array类在共享存储区存储数据,例如下面的代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print num.value
print arr[:]
程序将会打印如下的内容:
1 | 3.1415927 |
以上代码将打印出1
2{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
服务进程管理相对于共享内存具有更高的灵活性因为它支持Python中原生的数据类型;同时一个manager对象可以为网络上不同计算机上的进程所共享,但缺点是比共享内存要慢。
使用workers池
Pool类表示一个工作进程池,它提供了几种不同的方法将任务分发到各个进程。例如1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
# print "[0, 1, 4,..., 81]"
print pool.map(f, range(10))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print i
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print res.get(timeout=1) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print res.get(timeout=1) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print [res.get(timeout=1) for res in multiple_results]
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print res.get(timeout=1)
except TimeoutError:
print "We lacked patience and got a multiprocessing.TimeoutError"
注意:pool的方法绝对不能被创建它的进程所使用。
注意:这个包下面的功能需要确保main模块对于子进程是可以导入的。这意味着在一些情况下程序不能正常工作,例如在交互模式下,看下面的例子1
2
3
4
5
6
7
8
9
10
11
12
13from multiprocessing import Pool
5) p = Pool(
def f(x):
return x*x
...
1,2,3]) p.map(f, [
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'