rocketMQ之八 发送批量消息

rocketMQ之八 发送批量消息发送批量消息 批量发送下消息 优点 能提高性能 缺点 一批消息只能有相同的 topic 相同的 waitStroeMsg 不能是延时消息 一批消息的总大小不能超过 4MB public class BatchProduce public static void main String args throws Exception 初始化生产者

大家好,我是讯享网,很高兴认识大家。

发送批量消息

  • 批量发送下消息
    • 优点:能提高性能
    • 缺点
      • 一批消息只能有相同的topic,相同的waitStroeMsgOK
      • 不能是延时消息
      • 一批消息的总大小不能超过4MB
public class BatchProducer { public static void main(String[] args) throws Exception { //初始化生产者 DefaultMQProducer producer = new DefaultMQProducer("producer_group"); //指定nameServer地址 producer.setNamesrvAddr("localhost:9876"); //启动 producer.start(); List<Message> msgs = new ArrayList<>(); for (int i = 0; i < 100; i++) { //创建消息,指定topic,tag和消息体 Message msg = new Message("topicList", "tag", ("rocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); msgs.add(msg); } //批量发送 SendResult result = producer.send(msgs); System.out.println(result); //关闭 producer.shutdown(); } } 

讯享网
  • 针对总长度不能超过4MB,可以调用以下这个工具类
讯享网 public class Listsplitter implements Iterator<List<Message>> { private final int SIZE_LIMIT = 1024 * 1024 * 4; private final List<Message> messages; private int currIndex; public Listsplitter(List<Message> messages) { this.messages = messages; } @Override public boolean hasNext() { return currIndex < messages.size(); } @Override public List<Message> next() { int nextIndex = currIndex; int totalSize = 0; for (;nextIndex < messages.size();nextIndex ++){ Message message = messages.get(nextIndex); int tmpSize = message.getTopic().length()+message.getBody().length; Map<String, String> properties = message.getProperties(); for (Map.Entry<String, String> entry : properties.entrySet()) { tmpSize += entry.getKey().length()+entry.getValue().length(); } tmpSize = tmpSize + 20;//增加日志的开销20字节 if (tmpSize > SIZE_LIMIT){ //单条消息超过了最大的限制 //忽略,否则会阻塞分裂的进程 if (nextIndex - currIndex == 0){ //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环 nextIndex ++; } break; } if (tmpSize + totalSize > SIZE_LIMIT){ break; }else{ totalSize += tmpSize; } } List<Message> subList = messages.subList(currIndex,nextIndex); currIndex = nextIndex; return subList; } } 
  • 生产者就变成了这样
 public class BatchProducer { public static void main(String[] args) throws Exception { //初始化生产者 DefaultMQProducer producer = new DefaultMQProducer("producer_group"); //指定nameServer地址 producer.setNamesrvAddr("localhost:9876"); //启动 producer.start(); List<Message> msgs = new ArrayList<>(); for (int i = 0; i < 100; i++) { //创建消息,指定topic,tag和消息体 Message msg = new Message("topicList", "tag", ("rocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); msgs.add(msg); } Listsplitter listsplitter = new Listsplitter(msgs); while (listsplitter.hasNext()){ List<Message> next = listsplitter.next(); //批量发送 SendResult result = producer.send(next); System.out.println(result); } //关闭 producer.shutdown(); } } 
小讯
上一篇 2025-02-08 13:04
下一篇 2025-01-28 18:20

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/64229.html