2025年阻塞队列原理(阻塞队列offer方法)

阻塞队列原理(阻塞队列offer方法)p 作者 郑东旭 云原生实验室 p p class f center p

大家好,我是讯享网,很高兴认识大家。




讯享网

 <p>作者:郑东旭 云原生实验室</p><p class="f_center"><img src="https://nimg.ws.126.net/?url=http%3A%2F%2Fdingyue.ws.126.net%2F2020%2F0708%2F5d4c79e0j00qd4q020007c000hs008oc.jpg&thumbnail=660x&quality=80&type=jpg"/><br/></p><p>Docker 技术鼻祖系列</p><p><blockquote>文末直接送 5 本《Kubernetes 源码剖析》。<br/></blockquote></p><p>在 Kubernetes 系统中,组件之间通过 HTTP 协议进行通信,在不依赖任何中间件的情况下需要保证消息的实时性、可靠性、顺序性等。那么 Kubernetes 是如何做到的呢?答案就是 Informer 机制。Kubernetes 的其他组件都是通过 client-go 的 Informer 机制与 Kubernetes API Server 进行通信的。</p><p>而 Informer 又需要和 Reflector、Delta FIFO Queue、Workqueue 等协同工作,具体可以参考我之前的一篇文章:从 Kubernetes 资源控制到开放应用模型,控制器的进化之旅。</p><p>本文主要通过 WorkQueue 的源码来分析其工作原理。</p><p>WorkQueue 称为工作队列,Kubernetes 的 WorkQueue 队列与普通 FIFO(先进先出,First-In, First-Out)队列相比,实现略显复杂,它的主要功能在于标记和去重,并支持如下特性。</p><p><ul><li><strong>有序</strong>:按照添加顺序处理元素(item)。</li><li><strong>去重</strong>:相同元素在同一时间不会被重复处理,例如一个元素在处理之前被添加了多次,它只会被处理一次。</li><li><strong>并发性</strong>:多生产者和多消费者。</li><li><strong>标记机制</strong>:支持标记功能,标记一个元素是否被处理,也允许元素在处理时重新排队。</li><li><strong>通知机制</strong>:ShutDown 方法通过信号量通知队列不再接收新的元素,并通知 metric goroutine 退出。</li><li><strong>延迟</strong>:支持延迟队列,延迟一段时间后再将元素存入队列。</li><li><strong>限速</strong>:支持限速队列,元素存入队列时进行速率限制。限制一个元素被重新排队(Reenqueued)的次数。</li><li><strong>Metric</strong>:支持 metric 监控指标,可用于 Prometheus 监控。</li></ul></p><p>WorkQueue 支持 3 种队列,并提供了 3 种接口,不同队列实现可应对不同的使用场景,分别介绍如下。</p><p><ul><li><strong>Interface</strong>:FIFO 队列接口,先进先出队列,并支持去重机制。</li><li><strong>DelayingInterface</strong>:延迟队列接口,基于 Interface 接口封装,延迟一段时间后再将元素存入队列。</li><li><strong>RateLimitingInterface</strong>:限速队列接口,基于 DelayingInterface 接口封装,支持元素存入队列时进行速率限制。</li></ul>1. FIFO 队列</p><p>FIFO 队列支持最基本的队列方法,例如插入元素、获取元素、获取队列长度等。另外,WorkQueue 中的限速及延迟队列都基于 Interface 接口实现,其提供如下方法:</p><p></p><p>FIFO 队列 Interface 方法说明如下。</p><p><ul><li><strong>Add</strong>:给队列添加元素(item),可以是任意类型元素。</li><li><strong>Len</strong>:返回当前队列的长度。</li><li><strong>Get</strong>:获取队列头部的一个元素。</li><li><strong>Done</strong>:标记队列中该元素已被处理。</li><li><strong>ShutDown</strong>:关闭队列。</li><li><strong>ShuttingDown</strong>:查询队列是否正在关闭。</li></ul></p><p>FIFO 队列数据结构如下:</p><p></p><p>FIFO 队列数据结构中最主要的字段有 queue、dirty 和 processing。其中 queue 字段是实际存储元素的地方,它是 slice 结构的,用于保证元素有序;dirty 字段非常关键,除了能保证去重,还能保证在处理一个元素之前哪怕其被添加了多次(并发情况下),但也只会被处理一次;processing 字段用于标记机制,标记一个元素是否正在被处理。应根据 WorkQueue 的特性理解源码的实现,FIFO 存储过程如图 5-9 所示:</p><p class="f_center"><img src="https://nimg.ws.126.net/?url=http%3A%2F%2Fdingyue.ws.126.net%2F2020%2F0708%2Fc46f38e0j00qd4q02000ec000ko003uc.jpg&thumbnail=660x&quality=80&type=jpg"/><br/></p><p>图5-9 FIFO存储过程</p><p>通过 Add 方法往 FIFO 队列中分别插入 1、2、3 这 3 个元素,此时队列中的 queue 和 dirty 字段分别存有 1、2、3 元素,processing 字段为空。然后通过 Get 方法获取最先进入的元素(也就是 1 元素),此时队列中的 queue 和 dirty 字段分别存有 2、3 元素,而 1 元素会被放入 processing 字段中,表示该元素正在被处理。最后,当我们处理完 1 元素时,通过 Done 方法标记该元素已经被处理完成,此时队列中的 processing 字段中的 1 元素会被删除。</p><p>如图 5-9 所示,这是 FIFO 队列的存储流程,在正常的情况下,FIFO 队列运行在并发场景下。高并发下如何保证在处理一个元素之前哪怕其被添加了多次,但也只会被处理一次?下面进行讲解,FIFO 并发存储过程如图 5-10 所示。</p><p class="f_center"><img src="https://nimg.ws.126.net/?url=http%3A%2F%2Fdingyue.ws.126.net%2F2020%2F0708%2F51be3c59j00qd4q02000fc000ks003kc.jpg&thumbnail=660x&quality=80&type=jpg"/><br/></p><p>图5-10 FIFO并发存储过程</p><p>如图 5-10 所示,在并发场景下,假设 goroutine A 通过 Get 方法获取 1 元素,1 元素被添加到 processing 字段中,同一时间,goroutine B 通过 Add 方法插入另一个 1 元素,此时在 processing 字段中已经存在相同的元素,所以后面的 1 元素并不会被直接添加到 queue 字段中,当前 FIFO 队列中的 dirty 字段中存有 1、2、3 元素,processing 字段存有 1 元素。在 goroutine A 通过 Done 方法标记处理完成后,如果 dirty 字段中存有 1 元素,则将 1 元素追加到 queue 字段中的尾部。需要注意的是,dirty 和 processing 字段都是用 Hash Map 数据结构实现的,所以不需要考虑无序,只保证去重即可。</p><p>2. 延迟队列</p><p>延迟队列,基于 FIFO 队列接口封装,在原有功能上增加了 AddAfter 方法,其原理是延迟一段时间后再将元素插入 FIFO 队列。延迟队列数据结构如下:</p><p></p><p>AddAfter 方法会插入一个 item(元素)参数,并附带一个 duration(延迟时间)参数,该 duration 参数用于指定元素延迟插入 FIFO 队列的时间。如果 duration 小于或等于 0,会直接将元素插入 FIFO 队列中。</p><p>delayingType 结构中最主要的字段是 waitingForAddCh,其默认初始大小为 1000,通过 AddAfter 方法插入元素时,是非阻塞状态的,只有当插入的元素大于或等于 1000 时,延迟队列才会处于阻塞状态。waitingForAddCh 字段中的数据通过 goroutine 运行的 waitingLoop 函数持久运行。延迟队列运行原理如图 5-11 所示。</p><p class="f_center"><img src="https://nimg.ws.126.net/?url=http%3A%2F%2Fdingyue.ws.126.net%2F2020%2F0708%2F1aj00qd4q02000fc000hs006hc.jpg&thumbnail=660x&quality=80&type=jpg"/><br/></p><p>图5-11 延迟队列运行原理</p><p>如图 5-11 所示,将元素 1 放入 waitingForAddCh 字段中,通过 waitingLoop 函数消费元素数据。当元素的延迟时间不大于当前时间时,说明还需要延迟将元素插入 FIFO 队列的时间,此时将该元素放入优先队列(waitForPriorityQueue)中。当元素的延迟时间大于当前时间时,则将该元素插入 FIFO 队列中。另外,还会遍历优先队列(waitForPriorityQueue)中的元素,按照上述逻辑验证时间。</p><p>3. 限速队列</p><p>限速队列,基于延迟队列和 FIFO 队列接口封装,限速队列接口(RateLimitingInterface)在原有功能上增加了 AddRateLimited、Forget、NumRequeues 方法。限速队列的重点不在于 RateLimitingInterface 接口,而在于它提供的 4 种限速算法接口(RateLimiter)。其原理是,限速队列利用延迟队列的特性,延迟某个元素的插入时间,达到限速目的。RateLimiter 数据结构如下:</p><p></p><p>限速队列接口方法说明如下。</p><p><ul><li><strong>When</strong>:获取指定元素应该等待的时间。</li><li><strong>Forget</strong>:释放指定元素,清空该元素的排队数。</li><li><strong>NumRequeues</strong>:获取指定元素的排队数。</li></ul><blockquote>注意:这里有一个非常重要的概念——限速周期。限速周期是指从执行 AddRateLimited 方法到执行完 Forget 方法之前的时间。如果该元素被 Forget 方法处理完,从清空队列数。<br/></blockquote></p><p>下面会分别详解 WorkQueue 提供的 4 种限速算法,应对不同的场景,这 4 种限速算法分别如下。</p><p><ul><li>令牌桶算法(BucketRateLimiter)。</li><li>排队指数算法(ItemExponentialFailureRateLimiter)。</li><li>计数器算法(ItemFastSlowRateLimiter)。</li><li>混合模式(MaxOfRateLimiter),将多种限速算法混合使用。</li></ul>令牌桶算法</p><p>令牌桶算法是通过 Go 语言的第三方库 golang.org/x/time/rate 实现的。令牌桶算法内部实现了一个存放 token(令牌)的“桶”,初始时“桶”是空的,token 会以固定速率往“桶”里填充,直到将其填满为止,多余的 token 会被丢弃。每个元素都会从令牌桶得到一个 token,只有得到 token 的元素才允许通过(accept),而没有得到 token 的元素处于等待状态。令牌桶算法通过控制发放 token 来达到限速目的。令牌桶算法原理如图 5-12 所示。</p><p class="f_center"><img src="https://nimg.ws.126.net/?url=http%3A%2F%2Fdingyue.ws.126.net%2F2020%2F0708%2F1b4e8439j00qd4q02000fc000f8009wc.jpg&thumbnail=660x&quality=80&type=jpg"/><br/></p><p>图5-12 令牌桶算法原理</p><p>WorkQueue 在默认的情况下会实例化令牌桶,代码示例如下:</p><p></p><p>在实例化 rate.NewLimiter 后,传入 r 和 b 两个参数,其中 r 参数表示每秒往“桶”里填充的 token 数量,b 参数表示令牌桶的大小(即令牌桶最多存放的 token 数量)。我们假定 r 为 10,b 为 100。假设在一个限速周期内插入了 1000 个元素,通过 r.Limiter.Reserve().Delay 函数返回指定元素应该等待的时间,那么前 b(即 100)个元素会被立刻处理,而后面元素的延迟时间分别为 item100/100ms、item101/200ms、item102/300ms、item103/400ms,以此类推。</p><p>排队指数算法</p><p>排队指数算法将相同元素的排队数作为指数,排队数增大,速率限制呈指数级增长,但其最大值不会超过 maxDelay。元素的排队数统计是有限速周期的,一个限速周期是指从执行 AddRateLimited 方法到执行完 Forget 方法之间的时间。如果该元素被 Forget 方法处理完,则清空排队数。排队指数算法的核心实现代码示例如下:</p><p></p><p>该算法提供了 3 个主要字段:failures、baseDelay、maxDelay。其中,failures 字段用于统计元素排队数,每当 AddRateLimited 方法插入新元素时,会为该字段加 1;另外,baseDelay 字段是最初的限速单位(默认为 5ms),maxDelay 字段是最大限速单位(默认为 1000s)。排队指数增长趋势如图 5-13 所示。</p><p class="f_center"><img src="https://nimg.ws.126.net/?url=http%3A%2F%2Fdingyue.ws.126.net%2F2020%2F0708%2F85a78df3j00qd4q02000ic000f60092c.jpg&thumbnail=660x&quality=80&type=jpg"/><br/></p><p>图5-13 排队指数增长趋势</p><p>限速队列利用延迟队列的特性,延迟多个相同元素的插入时间,达到限速目的。</p><p><blockquote>注意:在同一限速周期内,如果不存在相同元素,那么所有元素的延迟时间为 baseDelay;而在同一限速周期内,如果存在相同元素,那么相同元素的延迟时间呈指数级增长,最长延迟时间不超过 baseDelay。<br/></blockquote></p><p>们假定 baseDelay 是 1 * time.Millisecond,maxDelay 是 1000 * time.Second。假设在一个限速周期内通过 AddRateLimited 方法插入 10 个相同元素,那么第 1 个元素会通过延迟队列的 AddAfter 方法插入并设置延迟时间为 1ms(即 baseDelay),第 2 个相同元素的延迟时间为 2ms,第 3 个相同元素的延迟时间为 4ms,第 4 个相同元素的延迟时间为 8ms,第 5 个相同元素的延迟时间为 16ms……第 10 个相同元素的延迟时间为 512ms,最长延迟时间不超过 1000s(即 maxDelay)。</p><p>计数器算法</p><p>计数器算法是限速算法中最简单的一种,其原理是:限制一段时间内允许通过的元素数量,例如在 1 分钟内只允许通过 100 个元素,每插入一个元素,计数器自增 1,当计数器数到 100 的阈值且还在限速周期内时,则不允许元素再通过。但 WorkQueue 在此基础上扩展了 fast 和 slow 速率。</p><p>计数器算法提供了 4 个主要字段:failures、fastDelay、slowDelay 及 maxFastAttempts。其中,failures 字段用于统计元素排队数,每当 AddRateLimited 方法插入新元素时,会为该字段加 1;而 fastDelay 和 slowDelay 字段是用于定义 fast、slow 速率的;另外,maxFastAttempts 字段用于控制从 fast 速率转换到 slow 速率。计数器算法核心实现的代码示例如下:</p><p></p><p>假设 fastDelay 是 5 * time.Millisecond,slowDelay 是 10 * time.Second,maxFastAttempts 是 3。在一个限速周期内通过 AddRateLimited 方法插入 4 个相同的元素,那么前 3 个元素使用 fastDelay 定义的 fast 速率,当触发 maxFastAttempts 字段时,第 4 个元素使用 slowDelay 定义的 slow 速率。</p><p>混合模式</p><p>混合模式是将多种限速算法混合使用,即多种限速算法同时生效。例如,同时使用排队指数算法和令牌桶算法,代码示例如下:</p><p></p> 

讯享网
小讯
上一篇 2025-05-26 08:51
下一篇 2025-05-09 14:53

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/146401.html