上一篇直通车 Python 并发编程系列–2、并发方式的选择
前言
根据前面的应用背景,这里介绍Python多线程编程时实例应用于网络爬虫–纯的I/O密集型任务。
多线程实现方式
多线程在Python中有不止一种实现方式,可以通过继承Threading类,也可以单纯使用Thread类的方法,不同之处是前者是后者的封装和改进,一般用前者,因为其更加方便,当然偶尔存在需要使用Thread类才能完成的任务(如对子线程生命周期的控制,设置Threading的setDaemon方法可能不够用)。今天看到了一篇译文:《一行 Python 实现并行化 – 日常多线程操作的新思路》,当然也可以参照原作者 Chris的文章,上面提到了使用map函数实现更为简洁的线程并发方式。本文对这几种分别进行介绍。
方法一:使用Thread类的方法
这种方式最为”原始”,最简单的使用方式是直接在需要的时候调用Thread类的start_new_thread方法,将需要并行执行的函数及其需要的参数传进去就可以了。
方法二:使用Threading结合Queue使用
Python中的Queue模块实现了线程安全的队列结构(Queue中使用了Threading中提供的锁实现了同步与互斥),因此使用Queue就不用担心饥饿或者死锁问题的发生了。下面是一个较为通用的I/O多线程书写模板,参照网上看到的一篇文章:Python爬虫(六)–多线程续(Queue)。关于Queue模块的介绍参见博客的另一篇文章Python Queue介绍,下面直接介绍多线程结合Queue的使用方法。
1 | #coding:utf-8 |
注意上面有一处和原文写的不一样,在worker函数内,原文书写方式为1
2
3
4
5while True:
if not SHARE_Q.empty():
item=SHARE_Q.get()
do_something(item)
SHARE_Q.task_done()
实际使用时采取这种的方式会导致程序无法结束,怀疑是程序无法退出while循环(这块没有仔细调试,所以不能确定)。改成上面代码处的方式程序可以正常中止。这种方式可以作为一个书写的模板,在do_something函数内实现自己的逻辑,然后添加必要的变量或者控制语句,就可以实现比较简单的多线程代码。
方法三:使用map函数和multiprocessing模块
前面谈到了这是在别的博客中见到的一个比较简洁的多线程实现,刚好利用了map函数的功能。关于map函数的功能,可以看我的另一篇博客 Python 函数,里面讲到了map函数的用法。下面直接看map函数的效果。
先看一下map的实现效果,下面是用map对一系列url进行访问,并将结果放到列表results中返回。1
results=map(urllib2.urlopen,['https://www.baidu.com','http://cn.bing.com'])
相当于下面的操作1
2
3
4urls=['https://www.baidu.com','http://cn.bing.com']
results=[]
for url in urls:
results.append(urllib2.urlopen(url))
map函数一手包办了序列操作、参数传递和结果保存等一系列的操作。借鉴这一点,选择multiprocessing模块中的子模块就可以实现线程并发。
在Python中由两个库包含了map函数,multiprocessing和它的子模块multiprocessing.dummy,前者用在多进程,而后者用于多线程。两者都有pool对象(线程池或者进程池)。使用两个pool的对象方法如下:1
2from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool
Pool对象有一些参数,第一个参数用于设定线程池中的线程数,默认为CPU的核数。线程不够,优势体现得不明显;线程过多,导致线程切换花的时间太长,划不来。在实际使用中最好根据需求人为设定。下面是针对上一个问题的多线程版本。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25import urllib2
from multiprocessing.dummy import Pool as ThreadPool
urls=[
'http://www.python.org',
'http://www.python.org/about/',
'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
'http://www.python.org/doc/',
'http://www.python.org/download/',
'http://www.python.org/getit/',
'http://www.python.org/community/',
'https://wiki.python.org/moin/',
'http://planet.python.org/',
'https://wiki.python.org/moin/LocalUserGroups',
'http://www.python.org/psf/',
'http://docs.python.org/devguide/',
'http://www.python.org/community/awards/'
]
#设定池子的大小
pool=ThreadPool(4)
results=pool.map(urllib2.urlopen,urls)
pool.close()
pool.join()
这里面核心只有4行代码,关键只有第23行这一行。而下面则是IBM经典教程中使用模型的解决方案,足足40多行。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54'''
A more realistic thread pool example
'''
import time
import threading
import Queue
import urllib2
class Consumer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self._queue = queue
def run(self):
while True:
content = self._queue.get()
if isinstance(content, str) and content == 'quit':
break
response = urllib2.urlopen(content)
print 'Bye byes!'
def Producer():
urls = [
'http://www.python.org', 'http://www.yahoo.com'
'http://www.scala.org', 'http://www.google.com'
# etc..
]
queue = Queue.Queue()
worker_threads = build_worker_pool(queue, 4)
start_time = time.time()
# Add the urls to process
for url in urls:
queue.put(url)
# Add the poison pillv
for worker in worker_threads:
queue.put('quit')
for worker in worker_threads:
worker.join()
print 'Done! Time taken: {}'.format(time.time() - start_time)
def build_worker_pool(queue, size):
workers = []
for _ in range(size):
worker = Consumer(queue)
worker.start()
workers.append(worker)
return workers
if __name__ == '__main__':
Producer()
相较之下,使用multiprocessing.dummpy中的pool对象的map方法要方便得多。
总结
以上三种方法只是众多多线程方法中的几种,除此之外我们当然可以使用Threading模块本身而不用Queue模块实现多线程,只是要自己实现一遍封装的过程。其实归根结底还是Thread和Threading模块,通此两者,再无难事:-)