Python 并发编程系列--3、多线程编程方法

上一篇直通车 Python 并发编程系列–2、并发方式的选择

前言

根据前面的应用背景,这里介绍Python多线程编程时实例应用于网络爬虫–纯的I/O密集型任务。

多线程实现方式

多线程在Python中有不止一种实现方式,可以通过继承Threading类,也可以单纯使用Thread类的方法,不同之处是前者是后者的封装和改进,一般用前者,因为其更加方便,当然偶尔存在需要使用Thread类才能完成的任务(如对子线程生命周期的控制,设置Threading的setDaemon方法可能不够用)。今天看到了一篇译文:《一行 Python 实现并行化 – 日常多线程操作的新思路》,当然也可以参照原作者 Chris的文章,上面提到了使用map函数实现更为简洁的线程并发方式。本文对这几种分别进行介绍。

方法一:使用Thread类的方法

这种方式最为”原始”,最简单的使用方式是直接在需要的时候调用Thread类的start_new_thread方法,将需要并行执行的函数及其需要的参数传进去就可以了。

方法二:使用Threading结合Queue使用

Python中的Queue模块实现了线程安全的队列结构(Queue中使用了Threading中提供的锁实现了同步与互斥),因此使用Queue就不用担心饥饿或者死锁问题的发生了。下面是一个较为通用的I/O多线程书写模板,参照网上看到的一篇文章:Python爬虫(六)–多线程续(Queue)。关于Queue模块的介绍参见博客的另一篇文章Python Queue介绍,下面直接介绍多线程结合Queue的使用方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#coding:utf-8
import threading
import Queue

SHARE_Q=Queue.Queue()
#线程个数
_WORKED_THERAD_NUM=3

class MyThread(threading.Thread):
'''
其中func为线程函数的逻辑部分
'''


def __init__(self,func):
super(MyThread,self).__init__()
self.func=func

def run(self):
'''
重写基类的run方法
'''

self.func()

def do_something(item):
'''
运行逻辑
'''

print item

def worker():
'''
用于写工作逻辑,只要队列不空就持续处理,为空时检查队列。
由于Queue中已经包含了wait notify 和响应的锁,所以不需要在取任务或者放任务的时候
加锁或解锁
'''


global SHARE_Q
while not SHARE_Q.empty():
item=SHARE_Q.get()
do_something(item)
SHARE_Q.task_done()

def main():
global SHARE_Q
threads=[]

for task in xrange(5):
SHARE_Q.put(task)

for i in xrange(_WORKED_THREAD_NUM):
thread=MyThread(worker)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()

SHARE_Q.join()

if __name__=='__main__':
main()

注意上面有一处和原文写的不一样,在worker函数内,原文书写方式为

1
2
3
4
5
while True:
if not SHARE_Q.empty():
item=SHARE_Q.get()
do_something(item)
SHARE_Q.task_done()

实际使用时采取这种的方式会导致程序无法结束,怀疑是程序无法退出while循环(这块没有仔细调试,所以不能确定)。改成上面代码处的方式程序可以正常中止。这种方式可以作为一个书写的模板,在do_something函数内实现自己的逻辑,然后添加必要的变量或者控制语句,就可以实现比较简单的多线程代码。

方法三:使用map函数和multiprocessing模块

前面谈到了这是在别的博客中见到的一个比较简洁的多线程实现,刚好利用了map函数的功能。关于map函数的功能,可以看我的另一篇博客 Python 函数,里面讲到了map函数的用法。下面直接看map函数的效果。
先看一下map的实现效果,下面是用map对一系列url进行访问,并将结果放到列表results中返回。

1
results=map(urllib2.urlopen,['https://www.baidu.com','http://cn.bing.com'])

相当于下面的操作

1
2
3
4
urls=['https://www.baidu.com','http://cn.bing.com']
results=[]
for url in urls:
results.append(urllib2.urlopen(url))

map函数一手包办了序列操作、参数传递和结果保存等一系列的操作。借鉴这一点,选择multiprocessing模块中的子模块就可以实现线程并发。
在Python中由两个库包含了map函数,multiprocessing和它的子模块multiprocessing.dummy,前者用在多进程,而后者用于多线程。两者都有pool对象(线程池或者进程池)。使用两个pool的对象方法如下:

1
2
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool

Pool对象有一些参数,第一个参数用于设定线程池中的线程数,默认为CPU的核数。线程不够,优势体现得不明显;线程过多,导致线程切换花的时间太长,划不来。在实际使用中最好根据需求人为设定。下面是针对上一个问题的多线程版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import urllib2
from multiprocessing.dummy import Pool as ThreadPool

urls=[
'http://www.python.org',
'http://www.python.org/about/',
'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
'http://www.python.org/doc/',
'http://www.python.org/download/',
'http://www.python.org/getit/',
'http://www.python.org/community/',
'https://wiki.python.org/moin/',
'http://planet.python.org/',
'https://wiki.python.org/moin/LocalUserGroups',
'http://www.python.org/psf/',
'http://docs.python.org/devguide/',
'http://www.python.org/community/awards/'
]

#设定池子的大小
pool=ThreadPool(4)

results=pool.map(urllib2.urlopen,urls)
pool.close()
pool.join()

这里面核心只有4行代码,关键只有第23行这一行。而下面则是IBM经典教程中使用模型的解决方案,足足40多行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
'''
A more realistic thread pool example
'''


import time
import threading
import Queue
import urllib2

class Consumer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self._queue = queue

def run(self):
while True:
content = self._queue.get()
if isinstance(content, str) and content == 'quit':
break
response = urllib2.urlopen(content)
print 'Bye byes!'


def Producer():
urls = [
'http://www.python.org', 'http://www.yahoo.com'
'http://www.scala.org', 'http://www.google.com'
# etc..
]
queue = Queue.Queue()
worker_threads = build_worker_pool(queue, 4)
start_time = time.time()

# Add the urls to process
for url in urls:
queue.put(url)
# Add the poison pillv
for worker in worker_threads:
queue.put('quit')
for worker in worker_threads:
worker.join()

print 'Done! Time taken: {}'.format(time.time() - start_time)

def build_worker_pool(queue, size):
workers = []
for _ in range(size):
worker = Consumer(queue)
worker.start()
workers.append(worker)
return workers

if __name__ == '__main__':
Producer()

相较之下,使用multiprocessing.dummpy中的pool对象的map方法要方便得多。

总结

以上三种方法只是众多多线程方法中的几种,除此之外我们当然可以使用Threading模块本身而不用Queue模块实现多线程,只是要自己实现一遍封装的过程。其实归根结底还是Thread和Threading模块,通此两者,再无难事:-)

下一篇直通车 Python 并发编程系列–4、多进程编程方法

文章目录
  1. 1. 上一篇直通车 Python 并发编程系列–2、并发方式的选择
  • 前言
  • 多线程实现方式
    1. 1. 方法一:使用Thread类的方法
    2. 2. 方法二:使用Threading结合Queue使用
    3. 3. 方法三:使用map函数和multiprocessing模块
  • 总结
    1. 1. 下一篇直通车 Python 并发编程系列–4、多进程编程方法
  • ,