前言 根据前面的应用背景,这里介绍Python多线程编程时实例应用于网络爬虫–纯的I/O密集型任务。
多线程实现方式 多线程在Python中有不止一种实现方式,可以通过继承Threading类,也可以单纯使用Thread类的方法,不同之处是前者是后者的封装和改进,一般用前者,因为其更加方便,当然偶尔存在需要使用Thread类才能完成的任务(如对子线程生命周期的控制,设置Threading的setDaemon方法可能不够用)。今天看到了一篇译文:《一行 Python 实现并行化 – 日常多线程操作的新思路》 ,当然也可以参照原作者 Chris的文章 ,上面提到了使用map函数实现更为简洁的线程并发方式。本文对这几种分别进行介绍。
方法一:使用Thread类的方法 这种方式最为”原始”,最简单的使用方式是直接在需要的时候调用Thread类的start_new_thread方法,将需要并行执行的函数及其需要的参数传进去就可以了。
方法二:使用Threading结合Queue使用 Python中的Queue模块实现了线程安全的队列结构(Queue中使用了Threading中提供的锁实现了同步与互斥),因此使用Queue就不用担心饥饿或者死锁问题的发生了。下面是一个较为通用的I/O多线程书写模板,参照网上看到的一篇文章:Python爬虫(六)–多线程续(Queue) 。关于Queue模块的介绍参见博客的另一篇文章Python Queue介绍 ,下面直接介绍多线程结合Queue的使用方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 import threadingimport QueueSHARE_Q=Queue.Queue() _WORKED_THERAD_NUM=3 class MyThread (threading.Thread) : ''' 其中func为线程函数的逻辑部分 ''' def __init__ (self,func) : super(MyThread,self).__init__() self.func=func def run (self) : ''' 重写基类的run方法 ''' self.func() def do_something (item) : ''' 运行逻辑 ''' print item def worker () : ''' 用于写工作逻辑,只要队列不空就持续处理,为空时检查队列。 由于Queue中已经包含了wait notify 和响应的锁,所以不需要在取任务或者放任务的时候 加锁或解锁 ''' global SHARE_Q while not SHARE_Q.empty(): item=SHARE_Q.get() do_something(item) SHARE_Q.task_done() def main () : global SHARE_Q threads=[] for task in xrange(5 ): SHARE_Q.put(task) for i in xrange(_WORKED_THREAD_NUM): thread=MyThread(worker) thread.start() threads.append(thread) for thread in threads: thread.join() SHARE_Q.join() if __name__=='__main__' : main()
注意上面有一处和原文写的不一样,在worker函数内,原文书写方式为
1 2 3 4 5 while True : if not SHARE_Q.empty(): item=SHARE_Q.get() do_something(item) SHARE_Q.task_done()
实际使用时采取这种的方式会导致程序无法结束,怀疑是程序无法退出while循环(这块没有仔细调试,所以不能确定)。改成上面代码处的方式程序可以正常中止。这种方式可以作为一个书写的模板,在do_something函数内实现自己的逻辑,然后添加必要的变量或者控制语句,就可以实现比较简单的多线程代码。
方法三:使用map函数和multiprocessing模块 前面谈到了这是在别的博客中见到的一个比较简洁的多线程实现,刚好利用了map函数的功能。关于map函数的功能,可以看我的另一篇博客 Python 函数 ,里面讲到了map函数的用法。下面直接看map函数的效果。 先看一下map的实现效果,下面是用map对一系列url进行访问,并将结果放到列表results中返回。
1 results=map(urllib2.urlopen,['https://www.baidu.com' ,'http://cn.bing.com' ])
相当于下面的操作
1 2 3 4 urls=['https://www.baidu.com' ,'http://cn.bing.com' ] results=[] for url in urls: results.append(urllib2.urlopen(url))
map函数一手包办了序列操作、参数传递和结果保存等一系列的操作。借鉴这一点,选择multiprocessing模块中的子模块就可以实现线程并发。 在Python中由两个库包含了map函数,multiprocessing和它的子模块multiprocessing.dummy,前者用在多进程,而后者用于多线程。两者都有pool对象(线程池或者进程池)。使用两个pool的对象方法如下:
1 2 from multiprocessing import Poolfrom multiprocessing.dummy import Pool as ThreadPool
Pool对象有一些参数,第一个参数用于设定线程池中的线程数,默认为CPU的核数。线程不够,优势体现得不明显;线程过多,导致线程切换花的时间太长,划不来。在实际使用中最好根据需求人为设定。下面是针对上一个问题的多线程版本。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import urllib2from multiprocessing.dummy import Pool as ThreadPoolurls=[ 'http://www.python.org' , 'http://www.python.org/about/' , 'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html' , 'http://www.python.org/doc/' , 'http://www.python.org/download/' , 'http://www.python.org/getit/' , 'http://www.python.org/community/' , 'https://wiki.python.org/moin/' , 'http://planet.python.org/' , 'https://wiki.python.org/moin/LocalUserGroups' , 'http://www.python.org/psf/' , 'http://docs.python.org/devguide/' , 'http://www.python.org/community/awards/' ] pool=ThreadPool(4 ) results=pool.map(urllib2.urlopen,urls) pool.close() pool.join()
这里面核心只有4行代码,关键只有第23行这一行。而下面则是IBM经典教程 中使用模型的解决方案,足足40多行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 ''' A more realistic thread pool example ''' import time import threading import Queue import urllib2 class Consumer (threading.Thread) : def __init__ (self, queue) : threading.Thread.__init__(self) self._queue = queue def run (self) : while True : content = self._queue.get() if isinstance(content, str) and content == 'quit' : break response = urllib2.urlopen(content) print 'Bye byes!' def Producer () : urls = [ 'http://www.python.org' , 'http://www.yahoo.com' 'http://www.scala.org' , 'http://www.google.com' ] queue = Queue.Queue() worker_threads = build_worker_pool(queue, 4 ) start_time = time.time() for url in urls: queue.put(url) for worker in worker_threads: queue.put('quit' ) for worker in worker_threads: worker.join() print 'Done! Time taken: {}' .format(time.time() - start_time) def build_worker_pool (queue, size) : workers = [] for _ in range(size): worker = Consumer(queue) worker.start() workers.append(worker) return workers if __name__ == '__main__' : Producer()
相较之下,使用multiprocessing.dummpy中的pool对象的map方法要方便得多。
总结 以上三种方法只是众多多线程方法中的几种,除此之外我们当然可以使用Threading模块本身而不用Queue模块实现多线程,只是要自己实现一遍封装的过程。其实归根结底还是Thread和Threading模块,通此两者,再无难事:-)