Queue 里 task_done 方法使用注意
Queue 是 Python 标准库的队列实现。在网上经常看到示例代码是这样实现的:
url = queue.get()
queue.task_done()
...
思路本身没什么问题——从队列中取一个数据,然后通知队列取成功了。但有些人代码是这样写的:
url = queue.get() queue.task_done() try: sock = urllib2.urlopen(url) except: ...
如果是在多线程环境下,这会带来个问题:若是 url = queue.get() 刚好把队列取空了,而由于执行了 urlopen,所以当前线程产生了阻塞,发生了线程切换,如果执行权限刚好落在了主线程,这时如果主线程的 queue.join 将导致整个程序结束掉。而 sock = urllib2.urlopen(url) 后面的代码都还没来得及执行。
为什么呢?
我们看下 Queue.py 里对 join 函数如何实现的:
79 self.all_tasks_done.acquire() 80 try: 81 while self.unfinished_tasks: # unfinished_tasks 保存了当前队列中的数目 82 self.all_tasks_done.wait() 83 finally: 84 self.all_tasks_done.release()
上面代码里有个 while 循环,unfinished_tasks 存储了当前队列里的数据数目,如果 unfinished_tasks 不为 0,将会一直调用 all_tasks_done.wait 阻塞住。只要 unfinished_tasks 不是 0,join 就可以一直工作下去。
而 task_done 正是减小了 unfinished_tasks。不妨再看看 task_done 的实现代码:
59 self.all_tasks_done.acquire() 60 try: 61 unfinished = self.unfinished_tasks - 1 62 if unfinished <= 0: 63 if unfinished < 0: 64 raise ValueError('task_done() called too many times') 65 self.all_tasks_done.notify_all() 66 self.unfinished_tasks = unfinished 67 finally: 68 self.all_tasks_done.release()
注意 61 行,表明每次执行了 task_done,unfinished_tasks 就减 1,直到 unfinished_tasks 为 0。
所以,在多线程下,想让队列变得正常的话,注意 task_done 的位置:
url = queue.get() ... try: sock = urllib2.urlopen(url) except: ... queue.task_done() # 要在一切都搞定后,再告诉队列空了。