1. 本实例使用的RocketMq版本为4.3.2, JDK>=1.8
2. 启动rocketmq, 在安装目录的bin目录下
启动 NAMESERVER start mqnamesrv.cmd
启动 BROKER start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
3. rocketmq jar包
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.2</version> </dependency>
4. application.properties
# 消费者组名 apache.rocketmq.consumer.PushConsumer=PushConsumer2 # 生产者的组名 apache.rocketmq.producer.producerGroup=Producer4 # NameServer的地址 apache.rocketmq.namesrvAddr=127.0.0.1:9876
Topic 默认对应四个队列 queue, 队列中有生产者和消费者, 生产者组成生产者组, 消费者组成消费组
有相同名称的是属于同一个组的, 一个queue对应一个消费者, 但一个消费者对应多个queue
setConsumeMessageBatchMaxSize ---- 设置批量消费消息的大小 (1: 一次一条 n: 一条多条)
消息消费完成后, 如果消费者返回CONSUME_SUCCESS则消息成功被消费
如果消费者返回RECONSUME_LATER 则消息在一段时间后会被再次消费
produce0 -------> queue0 --------> consume0
ProduceGroup produce1 -------> queue1 --------> consume1 ConsumeGroup
produce2 -------> queue2 --------> consume2
produce3 -------> queue3 --------> consume3
5. 生产者代码
@Component public class RocketMQProduce { @Value("${apache.rocketmq.producer.producerGroup}") private String producerGroup; @Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr; //@PostConstruct public void defaultMQProducer() { DefaultMQProducer producer = new DefaultMQProducer(producerGroup); producer.setNamesrvAddr(namesrvAddr); try { producer.start(); StopWatch stop = new StopWatch(); stop.start(); /*for(int i = 0; i< 1; i++) { SendResult result = producer.send(message); System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus()); }*/ for (int i = 0; i < 10; i++) { String messageContent = "消息:" + i; Message message = new Message("TopicTestA", "push", messageContent.getBytes(RemotingHelper.DEFAULT_CHARSET)); //send的最后一个参数指定发送给哪个消息队列 SendResult result = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, Integer.valueOf(i)); System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus()); } stop.stop(); } catch (MQClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (RemotingException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (MQBrokerException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { producer.shutdown(); } } }
6. 消费者代码 MessageListenerConcurrently 同步消费消息,而且是无序的
@Component public class RocketMQConsumer { @Value("${apache.rocketmq.consumer.PushConsumer}") private String consumerGroup; @Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr; @PostConstruct public void defaultMQPushConsumer() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); //设置最多一次消费多少条 consumer.setConsumeMessageBatchMaxSize(1); consumer.setNamesrvAddr(namesrvAddr); // consumer.setMessageModel(MessageModel.CLUSTERING); try { consumer.subscribe("TopicTestA", "push"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> { try { for (MessageExt messageExt : list) { System.out.println("messageExt: " + messageExt);// 输出消息内容 String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET); /*System.out.println("----消费响应:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody + " " + Thread.currentThread().getName());*/ int reconsumeTimes = messageExt.getReconsumeTimes(); //消费次数为2时,将消息消费掉 if(reconsumeTimes >= 2){ System.out.println("----消费响应:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody + " " + Thread.currentThread().getName() + " 消费次数:" + reconsumeTimes); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }else{ System.out.println("++++消费响应:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody + " " + Thread.currentThread().getName() + " 消费次数:" + reconsumeTimes); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 稍后再试 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消费成功 }); consumer.start(); } catch (MQClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }