Python 并发编程笔记
Table of Contents
1. 守护线程
在 C/C++ 中,子线程随着主线程退出而退出,但是 Python 默会在主线程退出时前等待所有子线程退出。如果一个线程的 daemon 为 True,主线程退出后,这个子线程也要跟着退出。
#!/usr/bin/env python #coding=utf-8 import threading,time,logging logging.basicConfig(level=logging.DEBUG,format='(%(threadName)-10s) %(message)s') def daemon(): logging.debug('Starting') time.sleep(2) logging.debug('Exiting') d = threading.Thread(name='daemon', target=daemon) #d.setDaemon(True) def non_daemon(): logging.debug('Starting') logging.debug('Exiting') t = threading.Thread(name='non-daemon',target=non_daemon) d.start() t.start()
执行后输出如下:
$ python test.py (daemon ) Starting (non-daemon) Starting (non-daemon) Exiting (daemon ) Exiting
如果把 d.setDaemon(True) 注释去掉,输出就会变成以下:
$ python test.py (daemon ) Starting (non-daemon) Starting (non-daemon) Exiting
因为 daemon 进程 sleep 还没完,主线程已经结束了,所以看不到(daemon ) Exiting。
2. Queue+Threading
这里,工作线程的代码大致如下:
while True: data = queue.get() .... queue.task_done()
先看看 Queue 里 get() 的实现:
150 def get(self, block=True, timeout=None): ...... 161 self.not_empty.acquire() 162 try: 163 if not block: 164 if not self._qsize(): 165 raise Empty 166 elif timeout is None: 167 while not self._qsize(): 168 self.not_empty.wait()
get 的 block 参数默认是 True,所以 queue.get() 将会执行到 167 行那句代码上,这时,如果队列空了,就会调用 self.not_empty.wait(),当前线程将被阻塞掉,接着就是发生了线程切换。
我的主线程最后句代码是:
queue.join()
再看看 join() 的实现:
79 self.all_tasks_done.acquire() 80 try: 81 while self.unfinished_tasks: #unfinished_tasks保存了当前队列中的任务数 82 self.all_tasks_done.wait() 83 finally: 84 self.all_tasks_done.release()
self.unfinished_tasks 保存了当前队列的任务数量,如果数量不为空,将执行到 self.all_tasks_done.wait(),这个时候主线程陷入阻塞中;如果为空的话,就结束 join。
但因为 threading 创建的线程默认不是守护线程,所以就产生一个矛盾:主线程执行完了 join,但其他线程还在 get 那里阻塞着的,而主线程又在等它们结束,所以程序自然无法自然结束了。
“守护线程”这个东西,就是说主线程必须等所有子线程结束了,才会退出;线程如果是守护线程的话,主线程不用等它们结束的。把 Thread 对象的 daemon 属性设置成 True,线程就成了守护线程了。
所以代码可以这么写(伪代码):
def work(): """这个是多线程调用的函数""" data = queue.get() ....这里是处理data的代码.... queue.task_done() def main(): t = Threading.thread() t.daemon = True t.start() ..... queue.join()
这个时候,虽然其他线程还在阻塞着的,但主线程会因为队列为空了,执行完 join,所以整个程序就结束了。
另外一个问题——queue.task_done 的位置,之前我以为这样就可以了:
data = queue.get()
queue.task_done()
...
再看看 task_done 的实现代码:
59 self.all_tasks_done.acquire() 60 try: 61 unfinished = self.unfinished_tasks - 1 62 if unfinished <= 0: 63 if unfinished < 0: 64 raise ValueError('task_done() called too many times') 65 self.all_tasks_done.notify_all() 66 self.unfinished_tasks = unfinished 67 finally: 68 self.all_tasks_done.release()
self.unfinished_tasks 保存着当前队列中任务数量,注意第 61 行,说明每次执行 task_done,self.unfinished_tasks 就减 1。join 结束的条件是 self.unfinished_tasks 为 0,具体代码看上面贴的。
所以,如果太着急调用 task_done 会产生一些问题,比如:
url = queue.get() queue.task_done() sock = urllib2.urlopen(url) ...
注意上面的代码,如果 url = queue.get() 刚好导致队列为空了,而 urlopen 产生了阻塞,所以发生了线程接换,如果不幸运恰恰切换到主线程,这时 queue.join 将导致整个程序结束掉!sock = urllib2.urlopen(url) 后面的代码还没来得及执行!
所以这样写才是正确的:
url = queue.get() sock = urllib2.urlopen(url) ... queue.task_done() # 这里保证了该执行的代码都执行完了,才通知队列删除一个元素
另:如果阅读代码水平不太差,遇到问题,就看看实现代码,在不懂的地方可以加点 print 之类的信息(不要认为标准模块就不能让咱们修改,照改不误)。看网上的文字描述性文章,理解概念性的东西可能很长时间都解决不了问题啊。
3. 线程间通信——Event
import time import threading end_event = threading.Event() def work1(): while not end_event.is_set(): print("work1") time.sleep(1) print("===work1 end===") def work2(): i = 0 while i < 10: print("work2") i += 1 time.sleep(1) end_event.set() t1 = threading.Thread(target=work1) t2 = threading.Thread(target=work2) t1.start() t2.start() t1.join() t2.join()
4. 进程池
import os import requests import multiprocessing def task(i): print(f"pid: {os.getpid()}") resp = requests.get("https://www.shellcodes.org") return resp.text multiprocessing.freeze_support() pool = multiprocessing.Pool() htmls = [] for i in range(0, 4): result = pool.apply_async(task, args=(i,)) htmls.append(result) pool.close() pool.join() # 调用 join 前必须调用 close() 或者 terminate() for html in htmls: print(html.get())
5. 线程池
以前 Python2 需要自己实现线程池,Python3 中直接用 concurrent.futures 的 ThreadPoolExecutor 类即可。
示例代码:
import requests from concurrent.futures import ThreadPoolExecutor def http_req(): resp = requests.get("https://www.shellcodes.org") return resp.status_code pool = ThreadPoolExecutor(10) status_codes = [] for i in range(10): # 每个线程都可以获得返回值 ret = pool.submit(http_req) status_codes.append(ret.result()) for status_code in status_codes: print(status_code)