Python 并发编程笔记

Table of Contents

1. 守护线程

在 C/C++ 中,子线程随着主线程退出而退出,但是 Python 默会在主线程退出时前等待所有子线程退出。如果一个线程的 daemon 为 True,主线程退出后,这个子线程也要跟着退出。

#!/usr/bin/env python
#coding=utf-8
import threading,time,logging
logging.basicConfig(level=logging.DEBUG,format='(%(threadName)-10s) %(message)s')

def daemon():
    logging.debug('Starting')
    time.sleep(2)
    logging.debug('Exiting')

d = threading.Thread(name='daemon', target=daemon)
#d.setDaemon(True)

def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')

t = threading.Thread(name='non-daemon',target=non_daemon)
d.start()
t.start()

执行后输出如下:

$ python test.py
(daemon    ) Starting
(non-daemon) Starting
(non-daemon) Exiting
(daemon    ) Exiting

如果把 d.setDaemon(True) 注释去掉,输出就会变成以下:

$ python test.py
(daemon    ) Starting
(non-daemon) Starting
(non-daemon) Exiting

因为 daemon 进程 sleep 还没完,主线程已经结束了,所以看不到(daemon ) Exiting。

2. Queue+Threading

这里,工作线程的代码大致如下:

while True:
   data = queue.get()
   ....
   queue.task_done()

先看看 Queue 里 get() 的实现:

150     def get(self, block=True, timeout=None):
......
161         self.not_empty.acquire()
162         try:
163             if not block:
164                 if not self._qsize():
165                     raise Empty
166             elif timeout is None:
167                 while not self._qsize():
168                     self.not_empty.wait()

get 的 block 参数默认是 True,所以 queue.get() 将会执行到 167 行那句代码上,这时,如果队列空了,就会调用 self.not_empty.wait(),当前线程将被阻塞掉,接着就是发生了线程切换。

我的主线程最后句代码是:

queue.join()

再看看 join() 的实现:

79         self.all_tasks_done.acquire()
80         try:
81             while self.unfinished_tasks:    #unfinished_tasks保存了当前队列中的任务数
82                 self.all_tasks_done.wait()
83         finally:
84             self.all_tasks_done.release()

self.unfinished_tasks 保存了当前队列的任务数量,如果数量不为空,将执行到 self.all_tasks_done.wait(),这个时候主线程陷入阻塞中;如果为空的话,就结束 join。

但因为 threading 创建的线程默认不是守护线程,所以就产生一个矛盾:主线程执行完了 join,但其他线程还在 get 那里阻塞着的,而主线程又在等它们结束,所以程序自然无法自然结束了。

“守护线程”这个东西,就是说主线程必须等所有子线程结束了,才会退出;线程如果是守护线程的话,主线程不用等它们结束的。把 Thread 对象的 daemon 属性设置成 True,线程就成了守护线程了。

所以代码可以这么写(伪代码):

def work():
"""这个是多线程调用的函数"""
    data = queue.get()

    ....这里是处理data的代码....

    queue.task_done()

def main():
    t = Threading.thread()
    t.daemon = True
    t.start()
    .....
    queue.join()

这个时候,虽然其他线程还在阻塞着的,但主线程会因为队列为空了,执行完 join,所以整个程序就结束了。

另外一个问题——queue.task_done 的位置,之前我以为这样就可以了:

data = queue.get()
queue.task_done()
...

再看看 task_done 的实现代码:

59         self.all_tasks_done.acquire()
60         try:
61             unfinished = self.unfinished_tasks - 1
62             if unfinished <= 0:
63                 if unfinished < 0:
64                     raise ValueError('task_done() called too many times')
65                 self.all_tasks_done.notify_all()
66             self.unfinished_tasks = unfinished
67         finally:
68             self.all_tasks_done.release()

self.unfinished_tasks 保存着当前队列中任务数量,注意第 61 行,说明每次执行 task_done,self.unfinished_tasks 就减 1。join 结束的条件是 self.unfinished_tasks 为 0,具体代码看上面贴的。

所以,如果太着急调用 task_done 会产生一些问题,比如:

url = queue.get()
queue.task_done()

sock = urllib2.urlopen(url)
...

注意上面的代码,如果 url = queue.get() 刚好导致队列为空了,而 urlopen 产生了阻塞,所以发生了线程接换,如果不幸运恰恰切换到主线程,这时 queue.join 将导致整个程序结束掉!sock = urllib2.urlopen(url) 后面的代码还没来得及执行!

所以这样写才是正确的:

url = queue.get()

sock = urllib2.urlopen(url)
...
queue.task_done() # 这里保证了该执行的代码都执行完了,才通知队列删除一个元素

另:如果阅读代码水平不太差,遇到问题,就看看实现代码,在不懂的地方可以加点 print 之类的信息(不要认为标准模块就不能让咱们修改,照改不误)。看网上的文字描述性文章,理解概念性的东西可能很长时间都解决不了问题啊。

3. 线程间通信——Event

import time
import threading

end_event = threading.Event()


def work1():
    while not end_event.is_set():
        print("work1")
        time.sleep(1)

    print("===work1 end===")


def work2():
    i = 0

    while i < 10:
        print("work2")
        i += 1
        time.sleep(1)

    end_event.set()


t1 = threading.Thread(target=work1)
t2 = threading.Thread(target=work2)

t1.start()
t2.start()

t1.join()
t2.join()

4. 进程池

import os
import requests
import multiprocessing


def task(i):
    print(f"pid: {os.getpid()}")
    resp = requests.get("https://www.shellcodes.org")
    return resp.text


multiprocessing.freeze_support()
pool = multiprocessing.Pool()

htmls = []
for i in range(0, 4):
    result = pool.apply_async(task, args=(i,))
    htmls.append(result)

pool.close()
pool.join()                     # 调用 join 前必须调用 close() 或者 terminate()

for html in htmls:
    print(html.get())

5. 线程池

以前 Python2 需要自己实现线程池,Python3 中直接用 concurrent.futures 的 ThreadPoolExecutor 类即可。

示例代码:

import requests
from concurrent.futures import ThreadPoolExecutor


def http_req():
    resp = requests.get("https://www.shellcodes.org")
    return resp.status_code


pool = ThreadPoolExecutor(10)

status_codes = []

for i in range(10):
    # 每个线程都可以获得返回值
    ret = pool.submit(http_req)
    status_codes.append(ret.result())


for status_code in status_codes:
    print(status_code)