阻塞队列有几种(阻塞队列有几种形式)

阻塞队列有几种(阻塞队列有几种形式)p strong 一 应用场景 strong p 目前系统中有很多需要用到延时处理的功能 支付超时取消 排队超时 短信 微信等提醒延迟发送 token 刷新 会员卡过期等等 通过延时处理 极大地节省系统的资源 不必轮询数据库处理任务 p p 目前大部分功能通过定时任务完成

大家好,我是讯享网,很高兴认识大家。



 <p> <strong>一、应用场景</strong></p> 

讯享网

讯享网目前系统中有很多需要用到延时处理的功能:支付超时取消、排队超时、短信、微信等提醒延迟发送、token刷新、会员卡过期等等。通过延时处理,极大地节省系统的资源,不必轮询数据库处理任务。</p> 

目前大部分功能通过定时任务完成,定时任务还分使用quartz及xxljob两种类型轮询时间短,每秒执行一次,对数据库造成一定的压力,并且会有1秒的误差。轮询时间久,如30分钟一次,03:01插入一条数据,正常3:31执行过期,但是3:30执行轮询时,扫描330的数据,是扫描不到3:31的数据的,需要4:00的时候才能扫描到,相当于多延迟了29分钟!</p> 

讯享网<strong>二、演示处理方式调研</strong></p> 

<strong>1.DelayQueue</strong></p> 

讯享网<strong>实现方式:</strong></p> 

jvm提供的延迟阻塞队列,通过优先级队列对不同延迟时间任务进行排序,通过condition进行阻塞、睡眠dealy时间 获取延迟任务。</p> 

讯享网当有新任务加入时,会判断新任务是否是第一个待执行的任务,若是,会解除队列睡眠,防止新加入的元素时需要执行的元素而不能正常被执行线程获取到。</p> 

<strong>存在的问题:</strong></p> 

讯享网单机运行,系统宕机后,无法进行有效的重试</p> 

没有执行记录和备份</p> 

讯享网没有重试机制</p> 

系统重启时,会将任务清空!</p> 

讯享网不能分片消费</p> 

<strong>优势:</strong> 实现简单,无任务时阻塞,节省资源,执行时间准确</p> 

讯享网<strong>2.延迟队列mq</strong></p> 

实现方式:依赖mq,通过设置延迟消费时间,达到延迟消费功能。像rabbitMq、jmq都可以设置延迟消费时间。RabbitMq通过将消息设置过期时间,放入私信队列进行消费实现。</p> 

讯享网<strong>存在的问题:</strong>时间设置不灵活,每个queue是固定的到期时间,每次新创建延时队列,需要创建新的消息队列</p> 

<strong>优点:</strong>依靠jmq,可以有效的监控、消费记录、重试,具备多机同时消费能力,不惧怕宕机</p> 

讯享网<strong>3.定时任务</strong></p> 

通过定时任务轮询符合条件的数据</p> 

讯享网<strong>缺点:</strong></p> 

必须要读业务数据库,对数据库造成一定的压力,</p> 

讯享网存在延时</p> 

一次扫描数据量过大时,占用过多的系统资源。</p> 

讯享网无法分片消费</p> 

 <strong>优点:</strong></p> 

讯享网消费失败后,下次还能继续消费,具备重试能力,</p> 

消费能力稳定</p> 

讯享网<strong>4.redis</strong></p> 

任务存储在redis中,使用redis的 zset队列根据score进行排序,程序通过线程不断获取队列数据消费,实现延时队列</p> 

讯享网<strong>优点:</strong></p> 

查询redis相比较数据库快,set队列长度过大,会根据跳表结构进行查询,效率高</p> 

讯享网redis可根据时间戳进行排序,只需要查询当前时间戳内的分数的任务即可</p> 

无惧机器重启</p> 

讯享网分布式消费</p> 

<strong>缺点:</strong></p> 

讯享网受限于redis性能,并发10W</p> 

多个命令无法保证原子性,使用lua脚本会要求所有数据都在一个redis分片上。</p> 

讯享网<strong>5. 时间轮</strong></p> 

通过时间轮实现的延迟任务执行,也是基于jvm单机运行,如kafka、netty都有实现时间轮,redisson的看门狗也是通过netty的时间轮实现的。</p> 

讯享网<strong>缺点:</strong>不适合分布式服务的使用,宕机后,会丢失任务。</p> 

<img src='https://file1.elecfans.com//web2/M00/99/78/wKgZomTngaiAEguUAABUS7uivn8952.jpg' alt='看门狗' /></p> 

讯享网<strong>三、实现目标</strong></p> 

兼容目前在使用的异步事件组件,并提供更可靠,可重试、有记录、可监控报警、高性能的延迟组件。</p> 

讯享网消息传输可靠性:消息进入到延迟队列后,保证至少被消费一次。</p> 

Client支持丰富:支持多重语言。</p> 

讯享网高可用性:支持多实例部署。挂掉一个实例后,还有后备实例继续提供服务。</p> 

实时性:允许存在一定的时间误差。</p> 

讯享网支持消息删除:业务使用方,可以随时删除指定消息。</p> 

支持消费查询</p> 

讯享网支持手动重试</p> 

对当前异步事件的执行增加监控</p> 

讯享网<strong>四、架构设计</strong></p> 

<img src='https://file1.elecfans.com//web2/M00/99/78/wKgZomTngaiASu1IAAIkKMoIofE453.png' alt='看门狗' /></p> 

讯享网<strong>五、延迟组件实现方式</strong></p> 


讯享网

<strong>1.实现原理</strong></p> 

讯享网目前选择使用jimdb通过zset实现延时功能,将任务id和对应的执行时间作为score存在在zset队列中,默认会按照score排序,每次取0-当前时间内的score的任务id,</p> 

发送延迟任务时,会根据时间戳+机器ip+queueName+sequence 生成唯一的id,构造消息体,加密后放入zset队列中。</p> 

讯享网通过搬运线程,将达到执行时间的任务移动到发布队列中,等待消费者获取。</p> 

监控方通过集成ump</p> 

讯享网消费记录通过redis备份+数据库持久化完成。</p> 

通过缓存实现的方式,只是实现的一种,可以通过参数控制使用哪一种实现方式,并可通过spi自由扩展。</p> 

讯享网<strong>2.消息结构</strong></p> 

每个Job必须包含以下几个属性:</p> 

讯享网Topic:Job类型,即QueueName</p> 

Id:Job的唯一标识。用来检索和删除指定的Job信息。</p> 

讯享网Delay:Job需要延迟的时间。单位:秒。(服务端会将其转换为绝对时间)</p> 

Body:Job的内容,供消费者做具体的业务处理,以json格式存储。</p> 

讯享网traceId:发送线程的traceId,待后续pfinder支持设置traceId后,可与发送线程公用同一个traceiD,便于日志追踪</p> 

具体结构如下图表示:</p> 

讯享网<img src='https://file1.elecfans.com//web2/M00/99/78/wKgZomTngaiAHK9wAAAg4-PgfXM181.png' alt='看门狗' /></p> 

TTR的设计目的是为了保证消息传输的可靠性。</p> 

讯享网<strong>3.数据流转及流程图</strong></p> 

<img src='https://file1.elecfans.com//web2/M00/99/78/wKgZomTngaiAQM96AADlTjHK-9Q330.png' alt='看门狗' /></p> 

讯享网基于redis-disruptor方式进行发布、消费,可以作为消息来进行使用,消费者采用原有异步事件的disruptor无锁队列消费,不同应用、不同queue之间无锁</p> 

1)支持应用只发布,不消费,达到消息队列的功能。</p> 

讯享网2)支持分桶,针对大key问题,若事件多,可以设置延迟队列和任务队列桶的数量,减小因大key造成的redis阻塞问题。</p> 

3)通过ducc配置,进行性能的扩展,目前只支持开启消费和关闭消费。 </p> 

讯享网4)支持设置超时时间配置,防止消费线程执行过久</p> 

瓶颈:消费速度慢,生产速度过快,会导致ringbuffer队列占满,当前应用既是生产者也是消费者时,生产者会休眠,性能取决于消费速度,可通过水平扩展机器,直接提升性能。监控redis队列的长度,若不断增长,可考虑增加消费者,直接提高性能。</p> 

讯享网可能出现的情况:因一个应用公用一个disruptor,拥有64个消费者线程,如果某一个事件消费过慢,导致64个线程都在消费这个事件,会导致其他事件无消费线程消费,生产者线程也被阻塞,导致所有事件的消费都被阻塞。</p> 

后期观察是否有这个性能瓶颈,可给每一个queue一个消费者线程池。</p> 

讯享网<strong>六、demo示例</strong></p> 

<strong>增加配置文件</strong></p> 

讯享网判断是否开启jd.event.enable:true</p> 

 </p> 

讯享网 
 
   
  
  
    
  
    com.jd.car 
   
  
    
  
    senna-event 
   
  
    
  
    1.0-SNAPSHOT 
   
 
   
 
配置
 jd: senna: event: enable: true queue: retryEventQueue: bucketNum: 1 handleBean: retryHandle
消费代码:
讯享网 package com.jd.car.senna.admin.event; import com.jd.car.senna.event.EventHandler; import com.jd.car.senna.event.annotation.SennaEvent; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; / 
 
   
 
  • @author zhangluyao
  • @description
  • @create 2022-02-21-9:54 下午 */ @Slf4j @Component(“retryHandle”) public class RetryQueueEvent extends EventHandler {
@Override protected void onHandle(String key, String eventType) { log.info(“Handler开始消费:{}”, key); } @Override protected void onDelayHandle(String key, String eventType) { log.info(“delayHandler开始消费:{}”, key); } }

 </p> 

讯享网<strong>注解形式:</strong></p> 

 </p> 

讯享网 package com.jd.car.senna.admin.event; import com.jd.car.senna.event.EventHandler; import com.jd.car.senna.event.annotation.SennaEvent; import lombok.extern.slf4j.Slf4j; / 
 
   
 
  • @author zhangluyao
  • @description
  • @create 2022-02-21-9:54 下午 */ @Slf4j @SennaEvent(queueName = “testQueue”, bucketNum = 5,delayBucketNum = 5,delayEnable = true) public class TestQueueEvent extends EventHandler {
@Override protected void onHandle(String key, String eventType) { log.info(“Handler开始消费:{}”, key); } @Override protected void onDelayHandle(String key, String eventType) { log.info(“delayHandler开始消费:{}”, key); } }

 </p> 

讯享网<strong>发送代码:</strong></p> 

 </p> 

讯享网 package com.jd.car.senna.admin.controller; import com.jd.car.senna.event.queue.IEventQueue; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Lazy; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.concurrent.CompletableFuture; / 
 
   
 
  • @author zly */ @RestController @Slf4j public class DemoController {
@Lazy @Resource(name = “testQueue”) private IEventQueue eventQueue; @ResponseBody @GetMapping(“/api/v1/demo”) public String demo() { log.info(“发送无延迟消息”); eventQueue.push(“no delay 5000 millseconds message 3”); return “ok”; } @ResponseBody @GetMapping(“/api/v1/demo1”) public String demo1() { log.info(“发送延迟5秒消息”); eventQueue.push(” delay 5000 millseconds message,name”,1000*5L); return “ok”; } @ResponseBody @GetMapping(“/api/v1/demo2”) public String demo2() { log.info(“发送延迟到2022-04-02 00:00:00执行的消息”); eventQueue.push(” delay message,name to 2022-04-02 00:00:00”, new Date(00)); return “ok”; } }

 </p> 

讯享网<br /><br /><br /><br /> 审核编辑:刘清</p> 
小讯
上一篇 2025-05-14 18:14
下一篇 2025-05-14 18:32

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/199345.html