作者微信 bishe2022

代码功能演示视频在页面下方,请先观看;如需定制开发,联系页面右侧客服
Springboot开发RocketMq(消息事务)
包含内容: 源码,全套工具
作者QQ1420527913
详情描述

如遇视频不清晰,请最大化观看演示

官网:http://www.ckplayer.com,版本号:X

以下仅列出部分功能,全部功能请至官网 《手册》查看

单独监听功能:


播放状态:
跳转状态:无
缓冲:100
当前音量:0.8
是否全屏:否
控制栏:显示
还未结束
当前播放时间(秒):0
前置广告状态:
鼠标位置
切换清晰度:
点击监听:
监听截图功能

Custom Tab

1. 本实例使用的RocketMq版本为4.3.2,  JDK>=1.8

2. 启动rocketmq, 在安装目录的bin目录下

    启动 NAMESERVER    start mqnamesrv.cmd

    启动 BROKER              start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

   image.png

   image.png

3. rocketmq jar包

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.2</version>
</dependency>

4. application.properties

# 消费者组名
apache.rocketmq.consumer.PushConsumer=PushConsumer2
# 生产者的组名
apache.rocketmq.producer.producerGroup=Producer4
# NameServer的地址
apache.rocketmq.namesrvAddr=127.0.0.1:9876

    Topic 默认对应四个队列 queue, 队列中有生产者和消费者, 生产者组成生产者组, 消费者组成消费组

    有相同名称的是属于同一个组的,   一个queue对应一个消费者, 但一个消费者对应多个queue

    setConsumeMessageBatchMaxSize   ----  设置批量消费消息的大小 (1: 一次一条   n: 一条多条)

    消息消费完成后, 如果消费者返回CONSUME_SUCCESS则消息成功被消费

                             如果消费者返回RECONSUME_LATER 则消息在一段时间后会被再次消费


                                                   produce0   ------->   queue0      -------->  consume0    

   ProduceGroup                         produce1   ------->   queue1      -------->  consume1                 ConsumeGroup

                                                    produce2   ------->   queue2      -------->  consume2

                                                    produce3   ------->   queue3      -------->  consume3

5. 事务生产者代码

@Component
public class TransactionProducer {
    @Value("${apache.rocketmq.producer.producerGroup}")
    private String producerGroup;
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    public void transactionProduce() throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer(producerGroup);
        producer.setNamesrvAddr(namesrvAddr);
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 1; i <= 2; i++) {
            try {
                Message msg =
                        new Message("TopicTest1234", "transaction"+i, "KEY" + i,
                                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, Integer.parseInt("1"));
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        /*for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }*/
        producer.shutdown();
    }
}


6. 消息事务回调提交

public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println("执行本地事务msg = " + new String(msg.getBody()));

        System.out.println("执行本地事务arg = " + arg);

        String tags = msg.getTags();

        if (tags.equals("transaction2")) {
            System.out.println("======我的操作============,失败了  -进行ROLLBACK");
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return LocalTransactionState.COMMIT_MESSAGE;
        // return LocalTransactionState.UNKNOW
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}


7. 消费者代码    

@Component
public class RocketMQConsumer {
   @Value("${apache.rocketmq.consumer.PushConsumer}")
   private String consumerGroup;

   @Value("${apache.rocketmq.namesrvAddr}")
   private String namesrvAddr;

   @PostConstruct
   public void defaultMQPushConsumer() {
      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
      //设置最多一次消费多少条
      consumer.setConsumeMessageBatchMaxSize(1);
      consumer.setNamesrvAddr(namesrvAddr);
//    consumer.setMessageModel(MessageModel.CLUSTERING);
      try {
         consumer.subscribe("TopicTestA", "push");
         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
         consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
            try {
               for (MessageExt messageExt : list) {
                  System.out.println("messageExt: " + messageExt);// 输出消息内容
                  String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                  /*System.out.println("----消费响应:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody
                        + "  " + Thread.currentThread().getName());*/

                  int reconsumeTimes = messageExt.getReconsumeTimes();

                  //消费次数为2时,将消息消费掉
                  if(reconsumeTimes >= 2){
                     System.out.println("----消费响应:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody
                           + "  " + Thread.currentThread().getName()
                           + "  消费次数:" + reconsumeTimes);
                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                  }else{
                     System.out.println("++++消费响应:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody
                           + "  " + Thread.currentThread().getName()
                           + "  消费次数:" + reconsumeTimes);
                     return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                  }
               }
            } catch (Exception e) {
               e.printStackTrace();
               return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 稍后再试
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消费成功
         });
         consumer.start();
      } catch (MQClientException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
      }

   }
}


Home