作者微信 bishe2022

代码功能演示视频在页面下方,请先观看;如需定制开发,联系页面右侧客服
Springboot开发RocketMq(同步有序消费)
包含内容: 源码,全套工具
作者QQ549710689
详情描述

如遇视频不清晰,请最大化观看演示

官网:http://www.ckplayer.com,版本号:X

以下仅列出部分功能,全部功能请至官网 《手册》查看

单独监听功能:


播放状态:
跳转状态:无
缓冲:100
当前音量:0.8
是否全屏:否
控制栏:显示
还未结束
当前播放时间(秒):0
前置广告状态:
鼠标位置
切换清晰度:
点击监听:
监听截图功能

Custom Tab

1. 本实例使用的RocketMq版本为4.3.2,  JDK>=1.8

2. 启动rocketmq, 在安装目录的bin目录下

    启动 NAMESERVER    start mqnamesrv.cmd

    启动 BROKER              start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

   image.png

   image.png

3. rocketmq jar包

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.2</version>
</dependency>

4. application.properties

# 消费者组名
apache.rocketmq.consumer.PushConsumer=PushConsumer2
# 生产者的组名
apache.rocketmq.producer.producerGroup=Producer4
# NameServer的地址
apache.rocketmq.namesrvAddr=127.0.0.1:9876

    Topic 默认对应四个队列 queue, 队列中有生产者和消费者, 生产者组成生产者组, 消费者组成消费组

    有相同名称的是属于同一个组的,   一个queue对应一个消费者, 但一个消费者对应多个queue

    setConsumeMessageBatchMaxSize   ----  设置批量消费消息的大小 (1: 一次一条   n: 一条多条)

    消息消费完成后, 如果消费者返回CONSUME_SUCCESS则消息成功被消费

                             如果消费者返回RECONSUME_LATER 则消息在一段时间后会被再次消费


                                                   produce0   ------->   queue0      -------->  consume0    

   ProduceGroup                         produce1   ------->   queue1      -------->  consume1                 ConsumeGroup

                                                    produce2   ------->   queue2      -------->  consume2

                                                    produce3   ------->   queue3      -------->  consume3

5. 生产者代码

@Component
public class RocketMQProduce {
   @Value("${apache.rocketmq.producer.producerGroup}")
   private String producerGroup;
   @Value("${apache.rocketmq.namesrvAddr}")
   private String namesrvAddr;

   //@PostConstruct
   public void defaultMQProducer() {
      try {
         DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
         //producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
         producer.setNamesrvAddr(namesrvAddr);

         producer.start();

         // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",
         // "TagE" };

         for (int i = 1; i <= 1; i++) {
            Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
               public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                  Integer id = (Integer) arg;
                  int index = id % mqs.size();
                  return mqs.get(index);
               }
            }, Integer.valueOf(i));

            System.out.println(sendResult);
         }

         producer.shutdown();
      } catch (MQClientException e) {
         e.printStackTrace();
      } catch (RemotingException e) {
         e.printStackTrace();
      } catch (MQBrokerException e) {
         e.printStackTrace();
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
   }
}

6. 消费者代码    MessageListenerOrderly 

     对于同一个队列, 队列中的消息是同步消费的

     如果是多个阶列, 多个队列中的消息是同步的

@Component
public class RocketMQConsumer {
   @Value("${apache.rocketmq.consumer.PushConsumer}")
   private String consumerGroup;

   @Value("${apache.rocketmq.namesrvAddr}")
   private String namesrvAddr;

   @PostConstruct
   public void defaultMQPushConsumer() throws Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        //consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
        consumer.setNamesrvAddr(namesrvAddr);

        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //一次只处理一条消息, 处理成功后再提交, 失败可以针对单条进行重试
        //如果这里设置多条,失败需要对多条进行重试
        consumer.setConsumeMessageBatchMaxSize(1);

        consumer.subscribe("TopicOrderTest", "order_1");


        /**
         * 注意: 只有被放在只一个queue中的消息才能被有序消费
         *       这里的顺序消费的维度是  同一个topic对应的一个queue, 一个queue只会被一个线程处理
         *       ---- queue1 ----> thread1
         *       ---- queue2 ----> thread2
         *       ---- queue3 ----> thread3
         *       ---- queue4 ----> thread4
         */
        consumer.registerMessageListener(new MessageListenerOrderly() {
            AtomicLong consumeTimes = new AtomicLong(0);

            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                // 设置自动提交
                context.setAutoCommit(true);
                System.out.println("msgs.size = " + msgs.size());
                for (MessageExt msg : msgs) {
                    System.out.println("内容:" + new String(msg.getBody()) + "  线程名称:" + Thread.currentThread().getName() + "  " + msg);
                    int reconsumeTimes = msg.getReconsumeTimes();

                    //第四次成功
                    if(reconsumeTimes >= 10){
                        //表明消息被消费
                        return ConsumeOrderlyStatus.SUCCESS;
                    }else{
                        //消息未被消费. 需要重试
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                }

                try {
                    TimeUnit.SECONDS.sleep(2L);
                } catch (InterruptedException e) {

                    e.printStackTrace();
                }
                ;

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();

        System.out.println("Consumer1 Started.");

   }
}

Home