源码解密协程队列和线程队列的成功原理 (协议解析程序)
本次来聊一聊/target=_blankclass=infotextkey>Python的队列,首先队列是一种不凡的线性表,具备先进先出(FIFO)的个性,这象征着元素的入队顺序和出队顺序是分歧的。
队列理论用于存储须要按顺序处置的数据,例如义务调度。当然队列最经常出现的一个运行场景就是解耦,一个线程不停地消费数据,放到队列里,另一个线程从队列中取数据启动消费。
而Python也提供了队列,区分是协程队列和线程队列。
importasyncioimportqueue#协程队列coroutine_queue=asyncio.Queue()#线程队列threading_queue=queue.Queue()
假设你的程序基于asyncio,那么应该经常使用协程队列,假设你的程序驳回了多线程,那么应该经常使用线程队列。
上方咱们来看一看这两种队列的API,以及底层成功原理。
协程队列
协程队列的详细成功由asyncio提供,以下是它的一些用法。
importasyncioasyncdefmn():#创立队列时可以指定能够存储的最大元素个数#不指定则没有容量限度queue=asyncio.Queue(maxsize=20)#前往容量print(queue.maxsize)"""20"""#减少元素,假设队列满了会阻塞,直到有残余空间awaitqueue.put(111)#减少元素,假设队列满了会抛意外#由于不须要阻塞期待,所以put_nowait不是协程函数queue.put_nowait(222)#队列能否已满print(queue.full())"""False"""#前往队列外部的元素个数print(queue.qsize())"""2"""#从队列中失掉元素,假设队列为空,会阻塞,直到队列中有可用元素print(awaitqueue.get())"""111"""#从队列中失掉元素,假设队列为空,会抛意外#由于不须要阻塞期待,所以put_nowait不是协程函数print(queue.get_nowait())"""222"""#队列能否为空print(queue.empty())"""True"""asyncio.run(main())
所以协程队列的API很便捷,咱们再列举一下:
图片
而后,协程队列还有两个API,须要独自说明,区分是task_done()和join()。
首先在协程队列外部有一个_unfinished_tasks属性,初始值为0,每当往队列减少一个元素时,该属性的值就会自增1。但是从队列取出元素时,该属性不会智能减1,须要手动调用task_done()方法。
所以_unfinished_tasks记载了队列中有多少个义务数据须要处置,每来一个智能加1,但取走一个不会智能减1,而是须要task_done来成功。
而后join()的作用是,当_unfinished_tasks不为0的时刻,awaitqueue.join()会阻塞,直到为0。
importasyncioasyncdefconsumer(queue,n):print(f"consumer{n}开局消费")awaitasyncio.sleep(3)awaitqueue.get()#失掉数据后,调用task_donequeue.task_done()print(f"consumer{n}消费终了")asyncdefmain():queue=asyncio.Queue()awaitqueue.put(123)awaitqueue.put(456)awaitqueue.put(789)#队列外面有三个数据,开启三个消费者去消费awaitasyncio.gather(consumer(queue,1),consumer(queue,2),consumer(queue,3),)#这里会堕入阻塞,直到_unfinished_tasks变为0awaitqueue.join()print("main解除阻塞")asyncio.run(main())"""consumer1开局消费consumer2开局消费consumer3开局消费consumer1消费终了consumer2消费终了consumer3消费终了main解除阻塞"""
还是比拟便捷的,而后咱们来看一下协程队列的详细成功细节。
图片
首先协程队列外部有一个_queue属性,它是一个双端队列,担任保留详细的元素。由于要保障两端的操作都是高效的,所以驳回双端队列成功。
而后是_getters和_putters两个属性,它们是做什么的呢?在队列满了的时刻,协程往队列减少元素时会堕入阻塞,等到队列有残余空间时会解除阻塞。同理,在队列为空时,协程从队列失掉元素时会堕入阻塞,等到队列有可用元素时会解除阻塞。
那么这个阻塞期待,以及智能唤醒并解除阻塞是怎样成功的呢?在引见锁和信号量的时刻,咱们剖析过整个成功环节,协程队列与之相似。
假定协程从队列失掉元素,但是队列为空,于是会创立一个Future对象,并保留起来,以后保留的中央就是_getters,它也是双端队列。而后awaitfuture,此时就会堕入阻塞,当其它协程往队列中减少元素时,会将_getters外面的future弹出,设置结果集。因此awaitfuture的协程就会解除阻塞,由于队列有可用元素了。
同理,协程往队列减少元素也是如此,假设队列满了,雷同创立一个Future对象,并保留起来,以后保留的中央就是_putters。而后awaitfuture,堕入阻塞,当其它协程从队列中取出元素,会将_putters外面的future弹出,设置结果集。因此awaitfuture的协程就会解除阻塞,由于队列有可用空间了。
图片
三个外部调用的方法,_get方法担任从队列的头部弹出元素,_put方法担任从队列的尾部追加元素,比拟便捷。而后是_wakeup_next方法,它担任唤醒阻塞的协程。参数waiters要么是_getters,要么是_putters,从外面弹出一个future,设置结果集,让对应的协程解除阻塞。
图片
图片
而后看看put_nowait和get_nowait,首先是put_nowait,往队列减少元素。
假设减少时发现队列已满,那么抛出意外。假设未满,则调用_put方法往_queue外面减少元素,由于元素的实践存储是由self._queue这个双端队列担任的。
减少终了后,将_unfinished_task加1。最后从_getters外面弹出future,设置结果集,让因失掉不到元素而堕入阻塞的协程解除阻塞(同时会将减少的元素取走)。
get_nowait的逻辑也很便捷,假设队列为空,间接抛意外。假设不为空,则调用_get方法从队列中弹出元素。最后从_putters外面弹出future,设置结果集,让因队列已满、不可减少元素而堕入阻塞的协程解除阻塞(同时会将元素减少进队列)。
再来看看put方法的成功细节:
图片
结果和咱们之前剖析的一样,只是源码外部多做了一些意外检测。再来看看get方法,它的成功细节和put是相似的。
图片
比拟便捷,还是没什么难度的,最后再来看看task_done和join两个方法。
图片
协程队列外面经常使用了asyncio.Event,它示意事情,假设事情对象没有调用set方法设置标记位,那么调用wait方法时会堕入阻塞。当事情对象调用set方法时,wait会解除阻塞。
所以协程队列的join方法的逻辑就是,当_unfinished_tasks大于0时,调用事情对象的wait方法堕入阻塞。
而task_done方法的作用就是将_unfinished_tasks减1,当它的值属性为0时,调用事情对象的set方法,让join解除阻塞。
以上就是整个协程队列的成功细节,详细的元素存储是由collections.deque来承载的。并在队列已满或许为空时,经过Future对象来成功阻塞期待和智能唤醒。
另外除了先进先出队列之外,还有先进后出队列,普通称为LIFO队列,它的成果相似于栈。
图片
这个没什么好说的,由于是先进后出,所以减少和弹出都在同一端,间接经常使用列表成功即可。并且由于LifoQueue承袭Queue,所以它的API和普通的协程队列是一样的。
除了先进先出队列,还有一个优先队列。
图片
它的API和普通的协程队列也是分歧的,只不过优先队列在减少元素时,须要指定一个优先级:(优先级,元素),优先级的值越低,示意优先级越高。而后在外部,会依照优先级的高下,保养一个小根堆,堆顶元素便是优先级最高的元素。
这几个队列详细经常使用哪一种,则取决于详细的业务场景。
线程队列
说完了协程队列,再来看看线程队列,它们的API是相似的,但成功细节则不同。由于操作系统感知不到协程,所以协程队列的阻塞期待是基于Future成功的,而线程队列的阻塞期待是基于条件变量(和互斥锁)成功的。
还是先来看看线程队列的一些API,和协程队列是相似的。
fromqueueimportQueue#可以指定一个maxsize参数,示意队列的容量#默以为0,示意队列的容量有限queue=Queue(maxsize=20)#检查容量print(queue.maxsize)"""20"""#检查队列的元素个数print(queue.qsize())"""0"""#判别队列能否已满print(queue.full())"""False"""#判别队列能否为空print(queue.empty())"""True"""#往队列中减少元素#block参数示意能否阻塞,默以为True,当队列已满时,线程会阻塞#timeout示意超时期间,默以为None,示意会有限期待#当然也可以给timeout传一个详细的值#假设在规则期间内,没有将元素放入队列,那么抛意外queue.put(123,block=True,timeout=None)#也是往队列中减少元素,但是当队列已满时,会间接抛意外#put_nowait(item)实质上就是put(item,block=False)queue.put_nowait(456)#从队列中取出元素#雷同可以传递block和timeout参数#block默以为True,当队列为空时会堕入阻塞#timeout默以为None,示意会有限期待print(queue.get(block=True,timeout=None))"""123"""#也是从队列中取出元素,但是当队列为空时,会间接抛意外#get_nowait()实质上就是get(block=False)print(queue.get_nowait())"""456"""#task_done(),将unfinished_tasks属性的值减1print(queue.unfinished_tasks)"""2"""queue.task_done()queue.task_done()print(queue.unfinished_tasks)"""0"""#join(),当unfinished_tasks不为0时,堕入阻塞queue.join()
API和协程队列是相似的,咱们列举一下:
图片
线程队列的详细经常使用咱们曾经知道了,上方来看看它的详细成功。
图片
线程队列的外部照旧经常使用双端队列启动元素存储,并且还经常使用了一个互斥锁和三个条件变量。
为了保障数据的分歧性和线程安保,当队列在多线程环境中被修正(比如减少或删除元素)时,须要经常使用互斥锁。任何须要修正队列的操作都必定在失掉到互斥锁之后启动,以防止多个线程同时对队列启动修正,否则会造成数据不分歧或其它失误。同时,一旦对队列的修正成功,必定立刻监禁互斥锁,以便其它线程可以访问队列。
而后是not_empty条件变量,当一个新元素被减少到队列时,应该向not_empty发送一个信号。这个举措会通知那些想从队列中失掉元素,但因队列为空而堕入阻塞的线程,如今队列中曾经有了新的元素,它们可以继续口头失掉元素的操作。
接上去是not_full条件变量,当从队列中取走一个元素时,应该向not_full发送一个信号。这个举措通知那些想往队列减少元素,但因队列已满而堕入阻塞的线程,如今队列中曾经有了可用空间,它们可以继续口头减少元素的操作。
最后是all_tasks_done条件变量,当处置的义务所有成功,即计数器unfinished_task为0时,应该向all_tasks_done发送一个信号。这个举措会通知那些口头了join()方法而堕入阻塞的线程,它们可以继续往下口头了。
图片
由于线程队列驳回了双端队列存储元素,所以双端队列的长度就是线程队列的元素个数。假设元素个数为0,那么队列就是空;假设容量大于0,并且小于等于元素个数,那么队列就满了。
图片
前面说了,put_nowait和get_nowait实质上就是调用了put和get,所以咱们的重点是put和get两个方法。
图片
以上就是put方法的底层成功,不难了解。说完了put,再来看看get。
图片
最后是task_done和join方法,看看它们的外部逻辑。
图片
调用join方法,当unfinished_task大于0时,会堕入阻塞。调用task_done方法,会将未成功义务数减1,假设为0,那么唤醒阻塞期待的线程。
须要留意的是,唤醒调用的方法不是notify,而是notify_all。关于减少元素和失掉元素,每次显然只能唤醒一个线程,此时调用notify。而unfinished_task为0时,应该要唤醒一切期待的线程,因此要调用notify_all。
最后线程队列也有相应的PriorityQueue和LifoQueue,它们的用法、成功和协程外面的这两个队列是一样的。
小结
以上便是协程队列和线程队列的详细用法和成功原理,它们实质上都是基于双端队列成功详细的元素存储,并且在队列已满和队列为空时,可以阻塞期待。
只不过协程队列是经过Future对象成功的,而线程队列是经过条件变量成功的。
当然,除了协程队列和线程队列,还有进程队列,但进程队列要复杂的多。因此关于进程队列的成功细节,咱们以后专门花篇幅去引见。
C/C++线程安全型队列的实现
首先,互斥量这种线程相关的内容是平台相关的,我假设你用的是windows平台开发。 其次,说明一下我的开发环境,vs2008,控制台程序,空的工程。 最后给你贴代码,分文件来看。 ===头文件QueueNode.h======你需要的节点数据可能不是整数,只要将typedef int QUEUEDATA这一句的int换成你想要的类型即可,但要注意,这个类型必须实现赋值操作符重载,相等比较操作符重载,以及复制构造函数===#ifndef _QUEUE_NODE_H_#define _QUEUE_NODE_H_typedef int QUEUEDATA;typedef struct node{QUEUEDATA data;node* m_pNext;}QUEUENODE;#endif===队列头文件Queue.h,有平台相关内容,请注意===#ifndef _QUEUE_H_#define _QUEUE_H_#include QueueNode.h#include <Windows.h>class ThreadSafeQueue{public:ThreadSafeQueue();virtual ~ThreadSafeQueue();bool InitQueue();void EnQueue(const QUEUEDATA& data);void DeQueue();void Clear();const QUEUENODE* Find(const QUEUEDATA& data) const;void Print();protected:HANDLE m_hMutex;QUEUENODE* m_pQueueHead;};#endif===队列函数实现文件===#include Queue.h#include <iostream>ThreadSafeQueue::ThreadSafeQueue(){m_pQueueHead = new QUEUENODE;m_pQueueHead->m_pNext = 0;}ThreadSafeQueue::~ThreadSafeQueue(){Clear();delete m_pQueueHead;CloseHandle(m_hMutex);}bool ThreadSafeQueue::InitQueue(){m_hMutex = CreateMutex(0, FALSE, 0);return (m_hMutex!=0);}void ThreadSafeQueue::EnQueue(const QUEUEDATA& data){WaitForSingleObject(m_hMutex, INFINITE);QUEUENODE* pNode = new QUEUENODE;pNode->data = data;pNode->m_pNext = 0;QUEUENODE* pTemp = m_pQueueHead;while (pTemp->m_pNext != 0){pTemp = pTemp->m_pNext;}pTemp->m_pNext = pNode;ReleaseMutex(m_hMutex);}void ThreadSafeQueue::DeQueue(){WaitForSingleObject(m_hMutex, INFINITE);QUEUENODE* pNode = m_pQueueHead->m_pNext;if (pNode != 0){m_pQueueHead->m_pNext = pNode->m_pNext;delete pNode;pNode = 0;}ReleaseMutex(m_hMutex);}const QUEUENODE* ThreadSafeQueue::Find(const QUEUEDATA& data) const{WaitForSingleObject(m_hMutex, INFINITE);QUEUENODE* pNode = m_pQueueHead->m_pNext;while (pNode != 0){if (pNode->data == data){break;}pNode = pNode->m_pNext;}return pNode;ReleaseMutex(m_hMutex);}void ThreadSafeQueue::Clear(){WaitForSingleObject(m_hMutex, INFINITE);QUEUENODE* pNode = m_pQueueHead->m_pNext;QUEUENODE* pTemp = 0;while (pNode != 0){pTemp = pNode->m_pNext;delete pNode;pNode = pTemp;}m_pQueueHead->m_pNext = 0;ReleaseMutex(m_hMutex);}void ThreadSafeQueue::Print(){WaitForSingleObject(m_hMutex, INFINITE);QUEUENODE* pNode = m_pQueueHead->m_pNext;while (pNode != 0){std::cout << pNode->data << \t;pNode = pNode->m_pNext;}std::cout << std::endl;ReleaseMutex(m_hMutex);}===测试代码文件,包含了测试用可执行程序,两个操作queue的线程,需要说明的是,我本来打算用WaitMultipleObjects函数来等待两个线程都结束,但是没搞清楚是什么问题没有卡住,不打算继续纠缠它了,所以让主线程Sleep了5秒钟===#include Queue.h#include <iostream>DWORD WINAPI HandleQueue(void* pParam);DWORD WINAPI HandleQueue2(void* pParam);int main(){ThreadSafeQueue queue;();HANDLE hThread[2] = {0};DWORD threadID = 0;hThread[0] = CreateThread(NULL, 0, HandleQueue, (void*)(&queue), NULL, &threadID);hThread[0] = CreateThread(NULL, 0, HandleQueue2, (void*)(&queue), NULL, &threadID);//WaitForMultipleObjects(2, hThread, TRUE, INFINITE);Sleep(5000);();();return 0;}DWORD WINAPI HandleQueue(void* pParam){ThreadSafeQueue* pQueue = reinterpret_cast<ThreadSafeQueue*>(pParam);for (int i = 0; i < 100; i++){std::cout << HandleQueue EnQueue << std::endl;pQueue->EnQueue(i);}for (int i = 0; i < 50; i++){std::cout << HandleQueue DeQueue << std::endl;pQueue->DeQueue();}return 0;}DWORD WINAPI HandleQueue2(void* pParam){ThreadSafeQueue* pQueue = reinterpret_cast<ThreadSafeQueue*>(pParam);for (int i = 0; i < 100; i++){std::cout << HandleQueue2 EnQueue << std::endl;pQueue->EnQueue(i+100);}for (int i = 0; i < 50; i++){std::cout << HandleQueue2 DeQueue << std::endl;pQueue->DeQueue();}return 0;}新建一个空的控制台程序工程,向工程中加入这几个文件,编译之后可以直接运行。 第一个线程投入队列100个元素,出队50个元素,第二个线程同样。 最后主线程输出队列中最后的内容,然后清空。 队列用链表实现,可以试想一下,如果线程同步没有处理,指针操作时一定会引起崩溃
VS 2008 C# Win服务项目 利用FileSystemWatcher加入队列,多线程操作队列问题
为什么要用两种类型的队列?
免责声明:本文转载或采集自网络,版权归原作者所有。本网站刊发此文旨在传递更多信息,并不代表本网赞同其观点和对其真实性负责。如涉及版权、内容等问题,请联系本网,我们将在第一时间删除。同时,本网站不对所刊发内容的准确性、真实性、完整性、及时性、原创性等进行保证,请读者仅作参考,并请自行核实相关内容。对于因使用或依赖本文内容所产生的任何直接或间接损失,本网站不承担任何责任。