RokectMQ单机Linux Centos8下搭建

RokectMQ单机Linux Centos8下搭建一 基础环境搭建 1 1 Java 环境 安装 OpenJDK8 yum search java grep jdk 查看 yum 支持的版本 yum install y java 1 8 0 openjdk 使用 yum 安装 jdk8 java version 检查是否成功 安装 Maven yum y install maven 二 安装

大家好,我是讯享网,很高兴认识大家。

一、基础环境搭建

1.1 Java环境

安装OpenJDK8

yum search java|grep jdk #查看yum支持的版本 yum install -y java-1.8.0-openjdk #使用yum安装jdk8 java -version #检查是否成功 

讯享网

安装Maven

讯享网yum -y install maven

二、安装

先贴出官网文档

Apache RocketMQ官网 https://rocketmq.apache.org/ 

Apache RocketMQ官网文档 https://rocketmq.apache.org/docs/quick-start/

Apache RocketMQ GitHub文档中文版 https://github.com/apache/rocketmq/tree/master/docs/cnf

2.1从发行版下载和构建

主要参考 Apache RocketMQ官网文档 https://rocketmq.apache.org/docs/quick-start/

wget https://mirror.bit.edu.cn/apache/rocketmq/4.8.0/rocketmq-all-4.8.0-source-release.zip #从官网下载4.8.0版本源码 unzip rocketmq-all-4.8.0-source-release.zip #解压 cd rocketmq-all-4.8.0/ mvn -Prelease-all -DskipTests clean install -U #开始构建 cd distribution/target/rocketmq-4.8.0/rocketmq-4.8.0

2.2修改配置

1)添加JAVA路径的绝对位置

需要添加的文件有两个:

tools.sh 这个文件中添加是需要创建Topic,不添加可能会无法创建Topic

讯享网JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext:${JAVA_HOME}/lib/ext:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.275.b01-1.el8_3.x86_64/jre/lib/ext"

runbroker.sh 这个文件和broker息息相关,不添加使用ACL时可能会无法签名

JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib:${JAVA_HOME}/lib/ext:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.275.b01-1.el8_3.x86_64/jre/lib/ext"

文件路径在: /rokectMQ解压后的路径/distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/bin

2)修改JVM相关配置

默认配置下的JVM最大内存为8G,需要根据服务器情况修改配置。可以参考 https://rocketmq.apache.org/docs/system-config/

tools.sh

讯享网JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m" 

runbroker.sh

JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g"

runserver.sh

讯享网JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

3)修改broker相关配置

文件路径在: /rokectMQ解压后的路径/distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/conf/broker.conf


讯享网

主要参考 https://github.com/apache/rocketmq/blob/release-4.8.0/docs/cn/acl/user_guide.md

brokerClusterName=DefaultCluster brokerName=broker-a brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH storePathRootDir=/data/rocketmq/rootdir-a-m storePathCommitLog=/data/rocketmq/commitlog-a-m #开启允许创建topic autoCreateTopicEnable = true #开启运行创建group autoCreateSubscriptionGroup=true if acl is open,the flag will be true aclEnable=true listenPort=10911 brokerIP1=XX.XX.XX.XX1 namesrvAddr=XX.XX.XX.XX:9876

4)修改权限控制存储

参考 https://github.com/apache/rocketmq/blob/release-4.8.0/docs/cn/acl/user_guide.md

三、启动

启动mqnamesrv,mqbroker前建好输入的日志文件

讯享网cd ~ mkdir logs cd logs mkdir rocketmqlogs cd rocketmqlogs touch broker.log touch namesrv.log

启动mqnamesrv

nohup sh bin/mqnamesrv > ~/logs/rocketmqlogs/mqnamesrv.log 2> ~/logs/rocketmqlogs/mqnamesrv.log & #要注意执行命令的路径

启动mqbroker

讯享网nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf > ~/logs/rocketmqlogs/broker.log 2> ~/logs/rocketmqlogs/broker.log & #要注意执行的路径

启动成功后日志文件中有

…… boot success. serializeType=JSON and name server is localhost:9876

四、测试发送和接收消息

4.1在安装RocketMQ本地测试

在安装RocketMQ本地测试需在第二步的broker相关配置种关闭ACL

启动生产者

讯享网sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

启动消费者

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

4.2远程ACL测试

rockectMQ源码中有example,简单修改即可测试

讯享网/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.rocketmq.example.simple; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; public class AclClient { private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>(); private static final String ACL_ACCESS_KEY = "RocketMQ";//修改为自己配置的 private static final String ACL_SECRET_KEY = "";//修改为自己配置的 public static void main(String[] args) throws MQClientException, InterruptedException { producer(); pushConsumer(); pullConsumer(); } public static void producer() throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("groupA", getAclRPCHook()); producer.setNamesrvAddr("xx.xx.xx.xx:9876");//服务器地址 producer.start(); for (int i = 0; i < 128; i++) try { { Message msg = new Message("topicA", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg,5000L); System.out.printf("%s%n", sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } public static void pushConsumer() throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_5", getAclRPCHook(), new AllocateMessageQueueAveragely()); consumer.setNamesrvAddr("xx.xx.xx.xx:9876"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // Wrong time format 2017_0422_ consumer.setConsumeTimestamp("800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); printBody(msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } public static void pullConsumer() throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_6", getAclRPCHook()); consumer.setNamesrvAddr("xx.xx.xx.xx:9876"); consumer.start(); Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest"); for (MessageQueue mq : mqs) { System.out.printf("Consume from the queue: %s%n", mq); SINGLE_MQ: while (true) { try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); System.out.printf("%s%n", pullResult); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); printBody(pullResult); switch (pullResult.getPullStatus()) { case FOUND: break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } consumer.shutdown(); } private static void printBody(PullResult pullResult) { printBody(pullResult.getMsgFoundList()); } private static void printBody(List<MessageExt> msg) { if (msg == null || msg.size() == 0) return; for (MessageExt m : msg) { if (m != null) { System.out.printf("msgId : %s body : %s \n\r", m.getMsgId(), new String(m.getBody())); } } } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = OFFSE_TABLE.get(mq); if (offset != null) return offset; return 0; } private static void putMessageQueueOffset(MessageQueue mq, long offset) { OFFSE_TABLE.put(mq, offset); } static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY)); } } 

 

五、关闭服务

关闭broker

sh bin/mqshutdown broker

关闭nameser

讯享网sh bin/mqshutdown namesrv

 

 

 

小讯
上一篇 2025-02-25 16:53
下一篇 2025-03-04 07:36

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/20507.html