Python 文档随译--multiprocessing模块

前言

Python官方文档以前没有认真读,现在仔细读一遍,并且做一下随译(就是说翻译地比较随意 呵呵)。水平有限,翻译可能存在问题,请自行斟酌。
本文对应的官方文档链接:multiprocessing

正文

简介

multiprocessing模块支持用和Treading模块API相似的语法生成进程。该模块支持本地和远程并发,利用subprocesses而不是threads,有效地避开了GIL的限制。该模块允许程序员在机器上充分利用多个CPU,该模块支持Windows和Unix平台。

multiprocessing模块同样提供了Threading模块所没有的,例如它提供的Pool对象,该对象具有一些方法,它可以让多个输入数据并行执行一个函数,实际上是通过将数据分配到每个进程中。下面是一个使用Pool对象实现并行化的简单例子。

1
2
3
4
5
6
7
8
from multiprocessing import Pool

def f(x):
return x*x

if __name__ == '__main__':
p = Pool(5)
print(p.map(f, [1, 2, 3]))

输出为

1
[1, 4, 9]

Process类

在multiprocessing模块中,可以通过Process对象创建多个进程对象,并且通过start方法来调用它们。Process采用threading.Thread,下面的例子是一个简单的多进程的例子

1
2
3
4
5
6
7
8
9
from multiprocessing import Process

def f(name):
print 'hello', name

if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()

想要显示出每个进程的ID,可以参照下面的扩展例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from multiprocessing import Process
import os

def info(title):
print title
print 'module name:', __name__
if hasattr(os, 'getppid'): # only available on Unix
print 'parent process:', os.getppid()
print 'process id:', os.getpid()

def f(name):
info('function f')
print 'hello', name

if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()

进程间交换对象

multiprocessing支持两种类型的对象作为进程间的交换通道。

Queue
Queue类几乎就是Queue.Queue类的克隆,看下面的例子:

1
2
3
4
5
6
7
8
9
10
11
from multiprocessing import Process, Queue

def f(q):
q.put([42, None, 'hello'])

if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print q.get() # prints "[42, None, 'hello']"
p.join()

需要注意,Queue既是线程安全的,同时又是进程安全的。

Pipes
pipe方法返回一对连接对象,它们是管道的两端,默认是双向的。下面是个例子

1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Process, Pipe

def f(conn):
conn.send([42, None, 'hello'])
conn.close()

if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print parent_conn.recv() # prints "[42, None, 'hello']"
p.join()

pipe函数返回的两个连接对象表示管道的两端,每个连接对象都有send()和recv()方法。注意如果管道的两端同时从对方接收或者同时向对方发送数据则会导致数据被破坏。

进程同步

multiprocessing提供了和threading模块等价的同步原语。例如我们可以使用锁来确保某一时刻只有一个进程向终端输出信息。

1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Process, Lock

def f(l, i):
l.acquire()
print 'hello world', i
l.release()

if __name__ == '__main__':
lock = Lock()

for num in range(10):
Process(target=f, args=(lock, num)).start()

如果不使用锁这些信息就会混杂在一起。

进程间共享状态

我们在进行并发编程时最好避免分享状态,再使用多进程时尤其要注意。
如果非要这么做,可以参照下面给出的 两种方式
共享内存
可以用Value或者Array类在共享存储区存储数据,例如下面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from multiprocessing import Process, Value, Array

def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]

if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))

p = Process(target=f, args=(num, arr))
p.start()
p.join()

print num.value
print arr[:]

程序将会打印如下的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
```
参数d和i在创建num和arr时被array模块当做字节码使用,d表明是一个双精度浮点数,而i则指明一个有符号的整数类型。这些共享对象是线程和进程安全的。
想要更灵活得使用共享内存,可以使用multiprocessing.sharedctypes,它支持从共享内存创建专门的ctype对象。

##服务进程##
由Manager()方法返回的manager对象可以用于控制一个服务进程,它能管理Python对象并且允许其他进程通过代理操作这些数据。
由Manager()方法返回的manager对象支持以下这些类型:list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value and Array。下面举个例子
``` Python
from multiprocessing import Process, Manager

def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()

if __name__ == '__main__':
manager = Manager()

d = manager.dict()
l = manager.list(range(10))

p = Process(target=f, args=(d, l))
p.start()
p.join()

print d
print l

以上代码将打印出

1
2
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

服务进程管理相对于共享内存具有更高的灵活性因为它支持Python中原生的数据类型;同时一个manager对象可以为网络上不同计算机上的进程所共享,但缺点是比共享内存要慢。

使用workers池

Pool类表示一个工作进程池,它提供了几种不同的方法将任务分发到各个进程。例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
return x*x

if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes

# print "[0, 1, 4,..., 81]"
print pool.map(f, range(10))

# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print i

# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print res.get(timeout=1) # prints "400"

# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print res.get(timeout=1) # prints the PID of that process

# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print [res.get(timeout=1) for res in multiple_results]

# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print res.get(timeout=1)
except TimeoutError:
print "We lacked patience and got a multiprocessing.TimeoutError"

注意:pool的方法绝对不能被创建它的进程所使用。
注意:这个包下面的功能需要确保main模块对于子进程是可以导入的。这意味着在一些情况下程序不能正常工作,例如在交互模式下,看下面的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'

文章目录
  1. 1. 前言
  2. 2. 正文
    1. 2.1. 简介
    2. 2.2. Process类
    3. 2.3. 进程间交换对象
    4. 2.4. 进程同步
    5. 2.5. 进程间共享状态
    6. 2.6. 使用workers池
,