注: 与本文相关图书推荐:《Python大学实用教程》《跟老齐学Python:轻松入门》
Python线程允许程序的不同部分同时运行,并可以简化设计。如果你对Python有一些经验,并且希望使用线程为程序加速,那么本文就是为你准备的!
什么是线程?
线程是一个独立的流,这意味着你的程序可以同时做两件事,但是,对于大多数Python程序,不同的线程实际上并不同时执行,它们只是看起来像是同时执行。
人们很容易认为线程是在程序上运行两个(或更多)不同的处理器,每个处理器同时执行一个独立的任务。这种看法大致正确,线程可能在不同的处理器上运行,但一个处理器一次只能运行一个线程。
要同时运行多个任务,不能用Python的标准方式实现,可以用不同的编程语言,或者多个进程实现,这样做的开发成本就高了。
由于用CPython实现了Python业务,线程可能不会加速所有任务,这是GIL(全称Global Interpreter Lock)的原因,一次只能运行一个Python线程。
如果某项任务需要花费大量时间等待外部事件,那么就可以应用多线程。如果是需要对CPU占用高并且花费很少时间等待外部事件,多线程可能枉费。
对于用Python编写并在标准CPython实现上运行的代码,这是正确的。如果你的线程是用C编写的,那么它们就能够释放GIL、并发运行。如果你在不同的Python实现上运行,也可以查看文档,了解它如何处理线程。
如果你正在运行一个标准的Python程序,只使用Python编写,并且有一个CPU受限的问题,那么你应该用多进程解决此问题。
将程序架构为使用线程也可以提高设计的清晰度。你将在下文中学习的大多数示例不一定会运行得更快,因为它们使用线程。在这些示例中使用线程有助于使设计更清晰、更易于推理。
所以,让我们停止谈论线程并开始使用它!
创建一个线程
现在你已经知道了什么是线程,让我们来学习如何制作线程。Python标准库提供了线程模块threading,它包含了你将在本文中看到的大部分内容。在这个模块中,Thread是对线程的封装,提供了简单的实现接口。
要创建一个线程,需要创建Thread的实例,然后调用它的.start()方法:
import logging
import threading
import time
def thread_function(name):
logging.info("Thread %s: starting", name)
time.sleep(2)
logging.info("Thread %s: finishing", name)
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
datefmt="%H:%M:%S")
logging.info("Main : before creating thread")
x = threading.Thread(target=thread_function, args=(1,))
logging.info("Main : before running thread")
x.start()
logging.info("Main : wait for the thread to finish")
# x.join()
logging.info("Main : all done")
如果你查看日志,可以看到在main部分正在创建和启动线程:
x = threading.Thread(target=thread_function, args=(1,))
x.start()
用函数thread_function()和arg(1,)创建一个Thread实例。在本文中用整数作为线程的名称,threading.get_ident()可以返回线程的名称,但可读性较差。
thread_function()函数的作用不大,它只是记录一些日志消息,在这些消息之间加上time.sleep()。
当你执行此程序时,输出将如下所示:
$ ./single_thread.py
Main : before creating thread
Main : before running thread
Thread 1: starting
Main : wait for the thread to finish
Main : all done
Thread 1: finishing
你会注意到代码的main部分结束之后,Thread才结束。后面会揭示这么做的原因。
守护线程
在计算机科学中,daemon是在后台运行的程序。
Python的threading模块对daemon有更具体的含义。当程序退出时,守护线程会立即关闭。考虑这些定义的一种方法是将daemon视为在后台运行的线程,而不必担心关闭它。
如果程序中正在执行的Threads不是daemons,则程序将在终止之前等待这些线程完成。然而,如果Threads是daemons,当程序退出时,它们就终止了。
让我们更仔细地看看上面程序的输出,最后两行是有点意思的。当运行这个程序时,在__main__打印完all done后以及线程结束之前会暂停大约2秒。
这个暂停是Python等待非后台线程完成。当Python程序结束时,关闭操作是清除线程中的程序。
如果查看threading模块的源代码,你将看到threading._shutdown()方法,它会遍历所有正在运行的线程,并在每一个没有设置daemon标志的线程上调用.join()方法。
因此,程序在退出时会等待,因为线程本身正在sleep(time.sleep(2))中。一旦完成并打印了消息,.join() 将返回,程序才可以退出。
通常,这是你想要的,但是我们还有其他的选择。让我们首先使用一个daemon线程来重复这个程序。你可以修改Thread实例化时的参数,添加daemon=True:
x = threading.Thread(target=thread_function, args=(1,), daemon=True)
现在运行程序时,应看到以下输出:
$ ./daemon_thread.py
Main : before creating thread
Main : before running thread
Thread 1: starting
Main : wait for the thread to finish
Main : all done
与前面不同的是,前面所输出的最后一行在这里没有了。thread_function()没有执行完,它是一个daemon线程,所以当_main__执行到达它的末尾时,程序结束,后台线程也就结束了。
线程实例的.join()方法
守护线程很方便,但是,如果要实现线程完全执行,而不是被迫退出,应该怎么办?现在让我们回到原始程序,看看注释掉的那一行:
# x.join()
要让一个线程等待另一个线程完成,可以调用.join()。取消对该行的注释,主线程将暂停并等待线程x,直到它运行结束。
你是否在程序中用守护线程或普通线程测试了这个问题?这并不重要。如果执行某个线程的.join()方法,该语句将一直等待,直到每个线程都完成。
使用多线程
到目前为止,示例代码只使用了两个线程:一个是主线程,另一个是以threading.Thread对象开始的线程。
通常,您会希望启动更多线程并让它们做一些有趣的工作。我们先来看看比复杂的方法,然后再看比较简单的方法。
启动多线程比较复杂的方法是你已经知道的:
import logging
import threading
import time
def thread_function(name):
logging.info("Thread %s: starting", name)
time.sleep(2)
logging.info("Thread %s: finishing", name)
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
datefmt="%H:%M:%S")
threads = list()
for index in range(3):
logging.info("Main : create and start thread %d.", index)
x = threading.Thread(target=thread_function, args=(index,))
threads.append(x)
x.start()
for index, thread in enumerate(threads):
logging.info("Main : before joining thread %d.", index)
thread.join()
logging.info("Main : thread %d done", index)
这段代码使用与上面看到的相同机制来启动线程,创建一个Thread实例对象,然后调用.start()。程序中生成一个由Thread实例组成的列表,后面再调用每个实例.join()方法。
多次运行此代码可能会产生一些有趣的结果。下面是我的机器的输出示例:
$ ./multiple_threads.py
Main : create and start thread 0.
Thread 0: starting
Main : create and start thread 1.
Thread 1: starting
Main : create and start thread 2.
Thread 2: starting
Main : before joining thread 0.
Thread 2: finishing
Thread 1: finishing
Thread 0: finishing
Main : thread 0 done
Main : before joining thread 1.
Main : thread 1 done
Main : before joining thread 2.
Main : thread 2 done
如果仔细检查输出,你将看到所有三个线程都按照你可能期望的顺序开始,但在本例中,它们是按照相反的顺序完成的!多次运行将产生不同的排序,可以通过查找Thread x: finishing消息来了解每个线程何时完成。
线程的运行顺序由操作系统决定,很难预测,它可能(而且很可能)因运行而异,因此在设计使用线程的算法时需要注意这一点。
幸运的是,Python提供了几个模块,你稍后将看到这些模块用来帮助协调线程并使它们一起运行。在此之前,让我们看看如何更简单地管理一组线程。
使用ThreadPoolExecutor
有一种比上面看到的更容易启动多线程的方法,它被称为ThreadPoolExecutor,是标准库中的concurrent.futures的一员(从Python3.2开始)。
创建它的最简单方法是使用上下文管理器的with语句,用它实现对线程池的创建和销毁。
下面是为了使用ThreadPoolExecutor而重写的上一个示例中的__main__部分代码:
import concurrent.futures
# [rest of code]
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
datefmt="%H:%M:%S")
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
executor.map(thread_function, range(3))
代码创建了一个ThreadPoolExecutor作为上下文管理器,告诉它需要在线程池中有多少个工作线程。然后它使用.map()遍历可迭代对象,在上面的例子中是range(3),将每个可迭代对象传递给线程池中的一个线程。
with语句块的尾部,默认会调用ThreadPoolExecutor的每个线程的.join()方法,建议你尽可能使用ThreadPoolExecutor作为上下文管理器,这样你就永远不会忘记对执行线程.join()。
注意:使用ThreadPoolExecutor可能会导致一些混乱的错误。
例如,如果调用不带参数的函数,但在.map()中传了参数,则线程应当抛出异常。
不幸的是,ThreadPoolExecutor隐藏了该异常,并且(在上面的情况下)程序将在没有输出的情况下终止。一开始调试可能会很混乱。
运行正确的示例代码将生成如下输出:
$ ./executor.py
Thread 0: starting
Thread 1: starting
Thread 2: starting
Thread 1: finishing
Thread 0: finishing
Thread 2: finishing
同样,请注意Thread 1是在Thread 0之前完成的,线程执行顺序的调度是由操作系统完成的,所遵循的计划也不易理解。
竞态条件
在讨论Python线程的其他特性之前,让我们先讨论一下编写线程程序时遇到的一个更困难的问题:竞态条件。
一旦你了解了什么是竞态条件,并看到了正在发生的情况,然后就使用标准库提供的模块,以防止这些竞态条件的出现。
当两个或多个线程访问共享数据或资源时,可能会出现竞态情况。在本例中,你将创建一个每次都发生的大型竞态条件,但请注意,大多数它并不是很明显。示例中的情况通常很少发生,而且会产生令人困惑的结果。可以想象,因为竞态条件而引起的bug很难被发现。
幸运的是,在下述示例中竞态问题每次都会发生,你将详细地了解它以便解释发生了什么。
对于本例,将编写一个更新数据库的类。你不会真的有一个数据库:你只是要伪造它,因为这不是本文的重点。
FakeDatabase类中有.__init__() 和 .update()方法:
class FakeDatabase:
def __init__(self):
self.value = 0
def update(self, name):
logging.info("Thread %s: starting update", name)
local_copy = self.value
local_copy += 1
time.sleep(0.1)
self.value = local_copy
logging.info("Thread %s: finishing update", name)
FakeDatabase中的属性.value,用于作为竞态条件中共享的数据。
.__init__()中将.value值初始化为0.,到目前为止,一切正常。
.update() 看起来有点奇怪,它模拟从数据库中读取一个值,对其进行一些计算,然后将一个新值写回数据库。
所谓从数据库中读取,即将.value的值复制到本地变量。计算就是在原值上加1,然后.sleep() 一小会儿。最后,它通过将本地值复制回.value,将值写回去。
下面是FakeDatabase的使用方法:
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
datefmt="%H:%M:%S")
database = FakeDatabase()
logging.info("Testing update. Starting value is %d.", database.value)
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
for index in range(2):
executor.submit(database.update, index)
logging.info("Testing update. Ending value is %d.", database.value)
程序中创建了两个ThreadPoolExecutor,然后对每个线程调用.submit(),告诉它们运行database.update()。
.submit()有一个明显特征,它允许将位置参数和命名参数传给线程中运行的函数:
.submit(function, *args, **kwargs)
在上面的用法中,index作为第一个也是唯一一个位置参数传给database.update()。你将在本文后面看到,可以用类似的方式传多个参数。
由于每个线程都运行.update(),而.update()会让.value的值加1,因此在最后打印时,你可能会希望database.value为2。但如果是这样的话,你就不会看这个例子了。如果运行上述代码,则输出如下:
$ ./racecond.py
Testing unlocked update. Starting value is 0.
Thread 0: starting update
Thread 1: starting update
Thread 0: finishing update
Thread 1: finishing update
Testing unlocked update. Ending value is 1.
你可能已经预料到这种情况会发生,但是让我们来看看实际情况的细节,因为这将使这个问题的解决方案更容易理解。
单线程
在用两个线程深入讨论这个问题之前,让我们先退一步,谈谈线程工作流程的一些细节。
我们不会在这里深入讨论所有的细节,因为这种全面深入的讨论现在并不重要。我们还将简化一些事情,这种做法虽然在技术上并不准确,但会让你对正在发生的事情有正确的认识。
当你告诉ThreadPoolExecutor运行每个线程时,也就是告诉它要运行哪个函数以及要传给它的参数:executor.submit(database.update, index)。
其结果是线程池中的每个线程都将调用database.update(index)。注意,database是__main__中创建的FakeDatabase实例对象,调用它的方法.update()。
每个线程都将引用同一个FakeDatabase的实例database,每个线程还将有一个唯一的值index。为了让上述过程更容易理解,请看下图:

当某线程开始运行.update()时,它有此方法的本地的数据,即.update()中的local_copy。这绝对是件好事,否则,在两个线程中运行同一个函数就会互相干扰了。这意味着该函数的所有作用域(或本地)变量对于线程来说都是安全的。
现在,你已经理解,如果使用单个线程和对.update()的单个调用来运行上面的程序会发生什么情况。
如果只运行一个线程,如下图所示,会一步一步地执行.update()。下图中,语句显示在上面,下面用图示方式演示了线程中的local_value和共享的database.value 中的值的变化:

按照时间顺序,从上到下观察上面的示意图,从创建线程Thread 1开始,到Thread 1结束终止。
Thread 1启动时,FakeDatabase.value为零。方法中的第一行代码local_copy=self.value将0复制到局部变量。接下来,使用local_copy+=1语句增加local_copy的值。你可以看到Thread 1中的.value值为1。
然后,调用下一个time.sleep(),这将使当前线程暂停并允许其他线程运行。因为在这个例子中只有一个线程,所以这没有影响。
当Thread 1唤醒并继续时,它将新值从local_copy复制到FakeDatabase.value,然后线程完成。你可以看到database.value为1。
到目前为止,一切正常。你只运行了一次.update()并且将FakeDatabase.value递增为1。
两个线程
回到竞态条件,两个线程并行,但不是同时运行。每个线程都有自己的local_copy,并指向相同的database,正是这个共享数据库对象导致了这些问题。
程序还是从Thread 1执行.update()开始:

当Thread 1调用time.sleep()时,它允许另一个线程开始运行。这就是事情变得有趣的地方。
Thread 2启动并执行相同的操作。它也将database.value复制到其私有的local_copy,而此时共享的database.value尚未更新:

当Thread 1进入睡眠状态时,共享的database.value仍然未被修改,还是0,而此时的local_copy的两个私有版本的值都为1。
Thread 1现在醒来并保存其local_copy的值,然后线程终止,给Thread 2机会。Thread 2不知道在它睡眠时Thread 1运行并更新了database.value的值。Thread 2也将它的local_copy值存储到database.value中,并将其设置为1:

这两个线程交替访问一个共享对象,覆盖彼此的结果。当一个线程释放内存或在另一个线程完成访问之前关闭文件句柄时,可能会出现类似的竞态。
为什么这不是一个愚蠢的示例
上面的例子是刻意而为,目的是确保每次运行程序时都会发生竞态。因为操作系统可以在任何时候交换线程,所以在读取x的值之后,并且在写回递增的值之前,可以中断类似x=x+1的语句。
发生这种情况的原因细节非常有趣,但这篇文章的其余部分并不需要这些细节,所以可以跳过这个隐藏的部分。
既然你已经看到了运行过程中的竞态条件,让我们找出解决问题的方法!
使用锁实现同步
有很多方法可以避免或解决竞态。你不会在这里看到所有这些方法,但是有一些方法是经常使用的。让我们从Lock开始。
要解决上述竞态条件,需要找到一种方法,使得在代码的“读-修改-写”操作中一次只允许一个线程。最常见的方法是使用Python中名为Lock的方法。在其他的一些语言中,类似的被称为Mutex,Mutex源于MUTual EXclusion,这正是Lock的作用。
Lock像是通行证,一次只能有一个线程拥有Lock,任何其他想要Lock的线程都必须等到Lock的所有者放弃它。
执行此操作的基本函数是.acquire() 和 .release()。线程将调用my_lock.acquire()来获取自己的锁。如果锁已经被其他线程所有,则将等待它被释放。这里有一点很重要,如果一个线程得到了锁,但尚未返回,你的程序将被卡住。你稍后会读到更多关于这方面的内容。
幸运的是,Python的Lock也将作为上下文管理器运行,因此你可以在一个带有with的语句中使用它,并且当with代码块由于任何原因退出时,锁也会自动释放。
让我们看看添加了锁的FakeDatabase,其所调用函数保持不变:
class FakeDatabase:
def __init__(self):
self.value = 0
self._lock = threading.Lock()
def locked_update(self, name):
logging.info("Thread %s: starting update", name)
logging.debug("Thread %s about to lock", name)
with self._lock:
logging.debug("Thread %s has lock", name)
local_copy = self.value
local_copy += 1
time.sleep(0.1)
self.value = local_copy
logging.debug("Thread %s about to release lock", name)
logging.debug("Thread %s after release", name)
logging.info("Thread %s: finishing update", name)
除了添加一堆调试日志以便更清楚地看到锁操作之外,这里的大变化是添加一个名为._lock的属性,它是一个threading.Lock()实例对象。这个._lock在未锁定状态下初始化,并由with语句锁定和释放。
这里值得注意的是,运行此方法的线程将一直保持Lock,直到完全完成对数据库的更新。在这种情况下,这意味着函数将在复制、更新、休眠时保持锁定,然后将值写回数据库。
如果在日志记录设置为警告级别的情况下运行此版本,你将看到以下内容:
$ ./fixrace.py
Testing locked update. Starting value is 0.
Thread 0: starting update
Thread 1: starting update
Thread 0: finishing update
Thread 1: finishing update
Testing locked update. Ending value is 2.
看看这个。你的程序终于成功了!
在__main__中配置日志输出后,可以通过添加以下语句将级别设置为DEBUG来打开完整日志记录:
logging.getLogger().setLevel(logging.DEBUG)
在启用DEBUG后,运行此程序,如下所示:
$ ./fixrace.py
Testing locked update. Starting value is 0.
Thread 0: starting update
Thread 0 about to lock
Thread 0 has lock
Thread 1: starting update
Thread 1 about to lock
Thread 0 about to release lock
Thread 0 after release
Thread 0: finishing update
Thread 1 has lock
Thread 1 about to release lock
Thread 1 after release
Thread 1: finishing update
Testing locked update. Ending value is 2.
在输出中,你可以看到Thread 0得到了锁,并在进入睡眠状态时仍保持锁定。然后Thread 1启动并尝试获取相同的锁。因为Thread 0仍在持有锁,Thread 1必须等待。这就是Lock的互斥性。
本文其余部分中的许多示例将日志设置为WARNING和DEBUG级别。我们通常只是DEBUG级别的输出,因为DEBUG日志可能非常长。在日志记录打开的情况下尝试这些程序,看看它们能做什么。
死锁
在继续探索之前,应该先看看使用锁时的一个常见问题。如你所见,如果已经获取了Lock,则对.acquire()的二次调用将等到持有Lock的线程调用.release()。运行此代码时,你认为会发生什么情况?
import threading
l = threading.Lock()
print("before first acquire")
l.acquire()
print("before second acquire")
l.acquire()
print("acquired lock twice")
当程序第二次调用l.acquire()时,该函数将挂起,等待Lock的释放。在本例中,可以通过删除第二次调用来修复死锁,但死锁通常发生在以下两个微妙的事情之一:
- 未正确释放
Lock的错误。 - 设计问题,其中一个函数需要由某些函数调用,这些函数可能具有或可能不具有
Lock。
第一种情况有时会发生,但使用Lock作为上下文管理器会大大减少错误出现的频率。建议尽可能使用上下文管理器编写代码,因为它们有助于避免异常跳过.release()调用的情况。
在某些语言中,设计问题可能要复杂一些。值得庆幸的是,Python线程的又一个对象RLock就是为这种情况而设计的。它允许线程在调用.release()之前多次通过.acquire()实现RLock。该线程中调用.release()的次数与调用.acquire()的次数相同。
Lock和RLock是线程中用来防止竞态条件的两个基本工具,还有一些其他工具以不同的方式发挥作用。在你查看它们之前,让我们转到一个稍微不同的问题上。
生产者-消费者线程
生产者-消费者问题(Producer-Consumer Problem,以下简称:PCP)是计算机科学中研究线程或进程同步的代表性问题,下面要通过它的一个变体来了解Python中threading模块提供的各种方法。
对于本例,你将想象一个程序需要从网络读取信息并将其写入磁盘。程序会确定是否要请求信息。它必须监听并接受信息,这些信息不会以正常的速度传入,而是会以突发的方式传入。程序的这一部分叫做生产者。
另一方面,一旦收到信息,你就需要将其写入数据库。数据库访问速度很慢,但这个速度足以跟上信息传输的平均速度。当一大堆信息进来时,访问速度还不够快。这部分是消费者。
在生产者和消费者之间,创建一个Pipeline,它将随着你对不同的同步对象的了解而变化。
这是基本的布局。让我们看看使用Lock的解决方案。它并不完美,但它使用的工具是你已经知道的,所以这是一个很好的开始。
使用锁的PCP
因为这是一篇关于Python的threading模块的文章,而且你刚刚阅读了Lock的使用方法,,所以让我们尝试用一两个使用Lock的线程来解决这个问题。
一般的设计是,有一个producer线程从模拟网络读取消息并将信息放入Pipeline:
import random
SENTINEL = object()
def producer(pipeline):
"""Pretend we're getting a message from the network."""
for index in range(10):
message = random.randint(1, 101)
logging.info("Producer got message: %s", message)
pipeline.set_message(message, "Producer")
# Send a sentinel message to tell consumer we're done
pipeline.set_message(SENTINEL, "Producer")
要生成模拟信息,producer中会生成一个介于1和101(不含101)之间的随机整数,然后调用pipeline的.set_message(),将其发送到consumer。
producer还使用SENTINEL值作为标记,当向consumer发送了10个值,就停止发送。这有点尴尬,但不要担心,在完成这个示例之后,你将看到消除这个SENTINEL值的方法。
在pipeline的另一边是消费者:
def consumer(pipeline):
"""Pretend we're saving a number in the database."""
message = 0
while message is not SENTINEL:
message = pipeline.get_message("Consumer")
if message is not SENTINEL:
logging.info("Consumer storing message: %s", message)
consumer从pipeline中读取一条信息并将其写入一个虚拟数据库,在本例中,只是将信息打印到显示器上。如果它得到SENTINEL值,就结束函数执行过程,该函数将终止线程。
在看真正有趣Pipeline部分之前,这里是__main__的代码,它产生了以下线程:
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
datefmt="%H:%M:%S")
# logging.getLogger().setLevel(logging.DEBUG)
pipeline = Pipeline()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, pipeline)
executor.submit(consumer, pipeline)
这看起来应该相当熟悉,因为它接近前面示例中的__main__代码。
请记住,你可以通过取消注释行打开DEBUG日志记录,以查看所有日志记录消息:
# logging.getLogger().setLevel(logging.DEBUG)
通过DEBUG日志信息来查看每个线程获取和释放锁的确切位置是值得的。
现在让我们看看将信息从producer传递给消费者的管道:
class Pipeline:
"""
Class to allow a single element pipeline between producer and consumer.
"""
def __init__(self):
self.message = 0
self.producer_lock = threading.Lock()
self.consumer_lock = threading.Lock()
self.consumer_lock.acquire()
def get_message(self, name):
logging.debug("%s:about to acquire getlock", name)
self.consumer_lock.acquire()
logging.debug("%s:have getlock", name)
message = self.message
logging.debug("%s:about to release setlock", name)
self.producer_lock.release()
logging.debug("%s:setlock released", name)
return message
def set_message(self, message, name):
logging.debug("%s:about to acquire setlock", name)
self.producer_lock.acquire()
logging.debug("%s:have setlock", name)
self.message = message
logging.debug("%s:about to release getlock", name)
self.consumer_lock.release()
logging.debug("%s:getlock released", name)
哇!这么多代码。其中相当大的一部分只是日志语句,以便在运行代码时更容易看到发生了什么。下面是删除所有日志记录语句后的相同代码:
class Pipeline:
"""
Class to allow a single element pipeline between producer and consumer.
"""
def __init__(self):
self.message = 0
self.producer_lock = threading.Lock()
self.consumer_lock = threading.Lock()
self.consumer_lock.acquire()
def get_message(self, name):
self.consumer_lock.acquire()
message = self.message
self.producer_lock.release()
return message
def set_message(self, message, name):
self.producer_lock.acquire()
self.message = message
self.consumer_lock.release()
这似乎更容易处理。此版本代码中的Pipeline有三个成员:
.message存储要传递的信息。.producer_lock是threading.Lock实例对象,在producer线程中,用它控制对信息的访问.consumer_lock也是threading.Lock实例对象,它在consumer线程控制对信息的访问。
__init__()初始化这三个成员,然后调用.consumer_lock上的.acquire()。这是你想开始的状态。允许producer添加新信息,但consumer需要等待信息出现。
.get_message() 和 .set_messages()几乎相反。.get_message()调用consumer_lock上的.acquire(),它让consumer等待信息准备就绪。
一旦consumer获得了.consumer_lock,它就会复制出.message中的值,然后调用.producer_lock上的.release(),释放锁,允许producer将下一条信息插入到pipeline中。
在运行.set_message()之前,要注意.get_message()中的一个细节,通常以return self.message结束方法,但是此处不这样做,看看你能否弄清楚原因。
答案在此。一旦consumer调用.producer_lock.release(),它就会与producer交换位置,producer开始运行,这种情况可能在.release()返回之前发生!这意味着,当函数returns self.message时,有比较小的概率会生成下一条信息,因此你将丢失第一条信息。这是另一个竞态的例子。
转到.set_message(),可以看到事务的另一面,producer会用一条信息来调用它,获取.producer_lock,设置.message,然后调用consumer_lock上的.release()。这样就使得用户可以读取该值。
将日志设置为WARNING并执行代码,看看它是什么样子的:
$ ./prodcom_lock.py
Producer got data 43
Producer got data 45
Consumer storing data: 43
Producer got data 86
Consumer storing data: 45
Producer got data 40
Consumer storing data: 86
Producer got data 62
Consumer storing data: 40
Producer got data 15
Consumer storing data: 62
Producer got data 16
Consumer storing data: 15
Producer got data 61
Consumer storing data: 16
Producer got data 73
Consumer storing data: 61
Producer got data 22
Consumer storing data: 73
Consumer storing data: 22
一开始,你可能会发现奇怪的是,producer在consumer运行之前就收到两条信息。如果回顾一下producer和.set_message(),你会注意到,当producer视图将信息发送到pipeline时,会等待Lock。这是在producer收到信息和日志之后完成的。
当producer尝试发送第二条信息时,它将第二次调用.set_message(),并且它将被锁定。
操作系统可以在任何时候交换线程,但它通常会让每个线程在交换之前有一个合理的运行时间。这就是为什么producer通常运行到它在第二次调用.set_message()时被锁定为止。
但是,一旦某个线程被锁定,操作系统就会将其交换出去,并找到另一个要运行的线程,此时的另一个线程就是consumer。
consumer调用.get_message(),该函数读取信息并调用.producer_lock上的.release(),从而允许producer在下次交换线程时再次运行。
注意,第一条消息是43,这正是consumer读的内容,尽管 producer已经生成了45这条信息。
以上是有限的测试,并没有很好地解决PCP,因为它一次只允许管道中的有一个值。当producer收到大量信息时,它将无处安放这些信息。
让我们使用Queue寻找一个更好的方法来解决这个问题。
将队列应用于PCP
如果你希望一次能够处理管道中的多个值,就需要一种针对管道的数据结构,它相当于producer的备份,能实现数量增加和减少。
Python标准库有一个queue模块,该模块有一个Queue 类,下面将Pipeline改为Queue,就可以不再使用Lock锁定某些变量,此外,还将使用Python的threading模块中的Event来停止工作线程,这是一种与以往不同的方法。
从Event开始。当有很多线程等待threading.Event实例的时候,它能够将一个线程标记为一个事件。这段代码的关键是,等待事件的线程不一定需要停止它们正在做的事情,它们可以每隔一段时间检查一次Event的状态。
很多事情都可以触发event。在本例中,主线程将简单地休眠一段时间,然后运行.set():
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
datefmt="%H:%M:%S")
# logging.getLogger().setLevel(logging.DEBUG)
pipeline = Pipeline()
event = threading.Event()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event)
time.sleep(0.1)
logging.info("Main: about to set event")
event.set()
这里唯一的变化是创建了event对象,然后将event作为参数传给后面的.submit方法,在with语句中,有一句要sleep一秒钟,再记录日志信息,最后调用event.set()。
producer也不需要改变太多:
def producer(pipeline, event):
"""Pretend we're getting a number from the network."""
while not event.is_set():
message = random.randint(1, 101)
logging.info("Producer got message: %s", message)
pipeline.set_message(message, "Producer")
logging.info("Producer received EXIT event. Exiting")
while循环中不再为pipeline设置SENTINEL值。consumer需要相应做较大改动:
def consumer(pipeline, event):
"""Pretend we're saving a number in the database."""
while not event.is_set() or not pipeline.empty():
message = pipeline.get_message("Consumer")
logging.info(
"Consumer storing message: %s (queue size=%s)",
message,
pipeline.qsize(),
)
logging.info("Consumer received EXIT event. Exiting")
必须删除SENTINEL值相关的代码,while循环的条件也因此更复杂了一些,现在需要考虑not event.is_set()和not pipeline.empty()两个条件,也就是未设置event,或者pipeline未清空时。
要确保在consumer进程结束是队列中已经是空的了,否则就会出现以下两种糟糕的情况。一是丢失了这些最终消息,但更严重的情况是第二种,producer如果视图将信息添加到完整队列中,会被锁住,从而不能返回。这种事件会发生在producer验证.is_set()条件之后,调用pipeline.set_message()之前。
这种事件会发生在producer验证.is_set()条件之后,调用pipeline.set_message()之前。
如果发生这种情况,producer可能会在队列仍然全满的情况下唤醒并退出。然后,调用.set_message(),.set_message()将一直等到队列中有新信息的空间。若consumer已经退出,这种情况就不会发生,而且producer不会退出。
consumer中的其他部分看起来应该很熟悉。
然而,Pipeline还需要重写:
class Pipeline(queue.Queue):
def __init__(self):
super().__init__(maxsize=10)
def get_message(self, name):
logging.debug("%s:about to get from queue", name)
value = self.get()
logging.debug("%s:got %d from queue", name, value)
return value
def set_message(self, value, name):
logging.debug("%s:about to add %d to queue", name, value)
self.put(value)
logging.debug("%s:added %d to queue", name, value)
上面的Pipeline是queue.Queue的子类。Queue 在初始化时指定一个可选参数,以指定队列的最大长度。
如果为maxsize指定一个正数,则该数字为队列元素个数的极限,如果达到该值,.put()方法被锁定,直到元素的数量少于maxsize才解锁。如果不指定maxsize,则队列将增长到计算机内存的所许可的最值。
.get_message()和.set_message()两个方法代码更少了,它们基本上把.get()和.put()封装在Queue中。你可能想知道防止线程发生竞态条件的锁都去了哪里。
编写标准库的核心开发人员知道,Queue经常在多线程环境中使用,于是将锁合并到Queue本身中。Queue对于线程来说是安全的。
此程序的运行如下所示:
$ ./prodcom_queue.py
Producer got message: 32
Producer got message: 51
Producer got message: 25
Producer got message: 94
Producer got message: 29
Consumer storing message: 32 (queue size=3)
Producer got message: 96
Consumer storing message: 51 (queue size=3)
Producer got message: 6
Consumer storing message: 25 (queue size=3)
Producer got message: 31
[many lines deleted]
Producer got message: 80
Consumer storing message: 94 (queue size=6)
Producer got message: 33
Consumer storing message: 20 (queue size=6)
Producer got message: 48
Consumer storing message: 31 (queue size=6)
Producer got message: 52
Consumer storing message: 98 (queue size=6)
Main: about to set event
Producer got message: 13
Consumer storing message: 59 (queue size=6)
Producer received EXIT event. Exiting
Consumer storing message: 75 (queue size=6)
Consumer storing message: 97 (queue size=5)
Consumer storing message: 80 (queue size=4)
Consumer storing message: 33 (queue size=3)
Consumer storing message: 48 (queue size=2)
Consumer storing message: 52 (queue size=1)
Consumer storing message: 13 (queue size=0)
Consumer received EXIT event. Exiting
通读上述示例的输出,会发现,有的地方很有意思。在顶部,你可以看到producer必须创建5条信息并将其中4条放在队列中,队列中最前面的一条被操作系统换掉之后,第5条条信息才能加入队列。
然后consumer运行,把第1条信息拉了出来,它打印出了该信息以及队列在此时的长度:
Consumer storing message: 32 (queue size=3)
此时,标明第5条信息还没有进入pipeline ,删除单个信息后queue的减小到3。你也知道queue可以保存10条消息,因此queue线程不会被queue阻塞,它被操作系统置换了。
注意:你调试的输出结果会有所不同。你的输出将随着运行次数的不同而改变。这就是用线程工作的乐趣所在!
执行代码,你能看到主线程生成event事件,这会导致producer立即退出,consumer还有很多工作要做,所以它会一直运行,直到清理完pipeline。
尝试操作大小不同的队列,并调用producer或consumer中的time.sleep(),以分别模拟更长的网络或磁盘访问时间。即使对程序的这些内容稍加更改,也会使结果产生很大差异。
这是解决发PCP的一个好方法,但是你可以进一步简化它,不需要使用Pipeline,一旦去掉日志记录,它就会变成一个queue.Queue。
下面是直接使用queue.Queue的最终代码:
import concurrent.futures
import logging
import queue
import random
import threading
import time
def producer(queue, event):
"""Pretend we're getting a number from the network."""
while not event.is_set():
message = random.randint(1, 101)
logging.info("Producer got message: %s", message)
queue.put(message)
logging.info("Producer received event. Exiting")
def consumer(queue, event):
"""Pretend we're saving a number in the database."""
while not event.is_set() or not queue.empty():
message = queue.get()
logging.info(
"Consumer storing message: %s (size=%d)", message, queue.qsize()
)
logging.info("Consumer received event. Exiting")
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
datefmt="%H:%M:%S")
pipeline = queue.Queue(maxsize=10)
event = threading.Event()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event)
time.sleep(0.1)
logging.info("Main: about to set event")
event.set()
这更易于阅读,并展示了如何使用Python的内置模块来简化复杂的问题。
Lock 和 Queue是便于解决并发问题的类,但标准库还提供了其他类。在结束本文之前,让我们浏览其中一些类。
Threading
Python的threading模块还提供了一些类,虽然上面的示例不需要这些,但是它们在不同的用例中可以派上用场,所以熟悉它们是有好处的。
Semaphore
threading.Semaphore有一些特殊属性的计数器对象,这里实现的计数具有原子性,意味着可以保证操作系统不会在递增或递减计数器的过程中交换线程。
内部计数器在调用.release()时递增,在调用.acquire()时递减。
另外一个特殊属性,如果一个线程在计数器为零时调用.acquire(),则该线程将被锁定,直到另一个线程调用.release(),并将计数器增加到1。
Semaphores通常用于保护容量有限的资源。例如,如果你有一个连接池,并且希望将该池的大小限制为特定的数目。
Timer
threading.Timer用于在经过一定时间后调度要调用的函数,你可以通过传入等待的秒数和调用的函数来创建Timer实例:
t = threading.Timer(30.0, my_function)
通过调用.start()启动Timer。在指定时间之后的某个时间点,将在新线程上调用该函数。但请注意,无法保证会在你希望的时间准确调用该函数。
如果要停止已经启动的Timer,可以调用.cancel()。如果在Timer触发后调用.cancel(),不会执行任何操作,也不会产生异常。
Timer可用于在特定时间后提示用户执行操作。如果用户在Timer过期之前执行操作,则可以调用.cancel()。
Barrier
threading.Barrier可用于保持固定数量的线程同步。创建Barrier时,调用方必须指定将要同步的线程数。每个线程都调用Barrier的.wait()方法,它们都将保持封锁状态,直到指定数量的线程在等待,然后全部同时释放。
请记住:线程是由操作系统调度的,因此,即使所有线程都是同时释放的,它们也将被调度为一次运行一个线程。
Barrier的一个用途是允许线程池对自身进行初始化。让这些线程初始化后在Barrier上等待,将确保在所有线程完成初始化之前,没有一个线程开始运行。
结论:Python中的线程
现在你已经了解了Python的threading提供的许多功能,以及一些如何写线程程序和用线程程序解决问题的示例。你还看到了在编写和调试线程程序时出现的一些问题。
原文链接:https://realpython.com/intro-to-python-threading/
关注微信公众号:老齐教室。读深度文章,得精湛技艺,享绚丽人生。