【RocketMQ】RocketMQ发送不同类型消息

news/2024/9/29 7:40:24 标签: java-rocketmq, rocketmq, java, springboot

🎯 导读:本文介绍了RocketMQ消息队列系统中的几种消息发送模式及其应用场景,包括同步消息、异步消息以及事务消息。同步消息确保了消息的安全性,但牺牲了一定的性能;异步消息提高了响应速度,适用于对响应时间敏感的场景;事务消息则保证了消息与本地事务的一致性,适用于需要预执行业务逻辑以决定消息是否发送的场景。此外,文章还探讨了Topic与Tag的应用策略,以及如何利用自定义Key来方便消息的查询和去重,提供了丰富的代码示例帮助理解。

文章目录

  • RocketMQ发送同步消息*
  • RocketMQ发送异步消息*
    • 异步消息生产者
    • 异步消息消费者
  • RocketMQ发送单向消息
    • 单向消息生产者
    • 单向消息消费者
  • RocketMQ发送延迟消息*
    • 延迟消息生产者
    • 延迟消息消费者
  • RocketMQ发送顺序消息
    • 场景分析
    • 定义消息实体
    • 顺序消息生产者
    • 顺序消息消费者
  • RocketMQ发送批量消息
    • 批量消息生产者
    • 批量消息消费者
  • RocketMQ发送事务消息(不够Seata方便)
    • 事务消息的发送流程
    • 事务消息生产者
    • 事务消息消费者
    • 测试结果
  • RocketMQ发送带标签的消息*(消息过滤)
    • 订阅关系一致
    • 标签消息生产者
    • 标签消息消费者
    • Topic 和 Tag 的应用推荐(官方推荐)
  • 发送消息携带自定义Key
    • 携带Key好处
    • 携带 key 消息生产者
    • 携带 key 消息消费者

使用*标注的为常用的消息类型

RocketMQ发送同步消息*

在这里插入图片描述

方法有返回Result的是同步消息,可以参考快速入门案例的实现

  • 同步消息发送过后会有一个返回值(MQ 服务器接收到消息后返回的一个确认),这种方式非常安全,但是性能没有那么高
  • 在 MQ 集群中,要等到所有的从机都复制了消息以后才会返回(要等很久)
  • 应用场景:重要的消息可以选择这种方式

在这里插入图片描述

RocketMQ发送异步消息*

  • 异步消息用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker响应的场景
  • 发送完以后会有一个异步消息通知告诉生产者消息是否发送成功

异步消息生产者

java">@Test
public void asyncProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("async-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();
    Message message = new Message("asyncTopic", "我是一个异步消息".getBytes());
    producer.send(message, new SendCallback() {
        // 异步回调方法
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("发送成功");
        }

        @Override
        public void onException(Throwable e) {
            System.err.println("发送失败:" + e.getMessage());
        }
    });
    System.out.println("我先执行");
    // 挂起jvm
    System.in.read();
}

在这里插入图片描述

异步消息消费者

java">@Test
public void testAsyncConsumer() throws Exception {
    // 创建默认消费者组
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
    // 设置nameServer地址
    consumer.setNamesrvAddr("localhost:9876");
    // 订阅一个主题来消费   *表示没有过滤参数 表示这个主题的任何消息
    consumer.subscribe("TopicTest", "*");
    // 注册一个消费监听 MessageListenerConcurrently是并发消费
    // 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                        ConsumeConcurrentlyContext context) {
            // 这里执行消费的代码 默认是多线程消费
            System.out.println(Thread.currentThread().getName() + "----" + msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

RocketMQ发送单向消息

  • 这种方式主要用在不关心发送结果的场景(没有同步或者异步回调,不在乎消息是否发送成功),例如日志信息的发送
  • 这种方式吞吐量很大,但是存在消息丢失的风险

单向消息生产者

java">@Test
public void testOnewayProducer() throws Exception {
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-group");
    // 设置nameServer地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动实例
    producer.start();
    Message msg = new Message("TopicTest", ("单向消息").getBytes());
    // 发送单向消息
    producer.sendOneway(msg);
    // 关闭实例
    producer.shutdown();
}

单向消息消费者

消费者和上面一样

RocketMQ发送延迟消息*

  • 消息放入 MQ 后,过一段时间,才会被消费者监听到
  • 在下单业务中,提交一个订单后,发送一个延时消息,30min后去检查这个订单的状态,如果还是未付款就取消订单,释放库存。类似场景还有 7 天自动收货

延迟消息生产者

RocketMQ 4.x 版本不支持任意时间的延时,只支持以下几个固定的延时等级,等级1就对应1s,以此类推,最高支持2h延迟

在这里插入图片描述

java">@Test
public void testDelayProducer() throws Exception {
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("ms-consumer-group");
    // 设置nameServer地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动实例
    producer.start();
    Message msg = new Message("TopicTest", ("延迟消息").getBytes());
    // 给这个消息设定一个延迟级别,每个级别对应一个时间
    // messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    msg.setDelayTimeLevel(3);
    // 发送单向消息
    producer.send(msg);
    // 打印时间
    System.out.println(new Date());
    // 关闭实例
    producer.shutdown();
}

RocketMQ 5.x 版本支持任意时间的延时(使用时间轮算法)

java">Message message = new Message("orderMsTopic", "订单号,座位号".getBytes());
// 直接设置延迟多少秒
message.setDelayTimeSec(500);
// 设置延迟多少毫秒
// message.setDelayTimeMs(100);
// 发延迟消息
producer.send(message);

延迟消息消费者

延时消息会有一点小误差,不完全准时

java">/**
 * 发送时间Fri Apr 21 16:19:54 CST 2023
 * 收到消息了Fri Apr 21 16:20:20 CST 2023
 * --------------
 * 发送时间Fri Apr 21 16:21:08 CST 2023
 * 收到消息Fri Apr 21 16:21:18 CST 2023
 *
 * @throws Exception
 */
@Test
public void msConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ms-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("orderMsTopic", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println("收到消息了" + new Date());
            System.out.println(new String(msgs.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

RocketMQ发送顺序消息

  • 消息有序指的是可以按照消息的发送顺序来消费(FIFO)
  • RocketMQ 可严格保证消息有序,例如下单之后,先发送短信、再发货

【有序类型】

  • 分区有序(可以直接通过MessageListenerOrderly实现)
  • 全局有序(MessageListenerOrderly + 设置Broker队列数量为1)

可能大家会有疑问,mq本身不就是FIFO吗?

答:一个broker中有四个queue,消费默认是并发的,线程之间有竞争关系,不能确保顺序

【顺序消费的原理解析】

  • 默认消息发送时采取Round Robin轮询方式把消息发送到不同的queue(分区队列)
  • 消费消息的时候,从多个queue上拉取消息,消费不能保证全局有序,只能保证分区有序(每个queue的消息都是有序的)

在这里插入图片描述

  • 但如果控制消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,就保证了全局有序(发送和消费参与的队列只有一个)

场景分析

模拟一个订单的发送流程,创建两个订单,发送的消息分别是

  • 订单号1000111 发消息流程 下订单->物流->签收
  • 订单号1000222 发消息流程 下订单->物流->拒收

即实现分区有序

定义消息实体

java">@Data
@AllArgsConstructor
@NoArgsConstructor
public class MsgModel {

    private String orderSn;
    private Integer userId;
    private String desc; // 下单 短信 物流
}

顺序消息生产者

java">private List<MsgModel> msgModels = Arrays.asList(
        new MsgModel("1000111", 1, "下单"),
        new MsgModel("1000111", 1, "短信"),
        new MsgModel("1000111", 1, "物流"),
        new MsgModel("1000222", 2, "下单"),
        new MsgModel("1000222", 2, "短信"),
        new MsgModel("1000222", 2, "物流")
);

@Test
public void orderlyProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("orderly-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();
    // 发送顺序消息  发送时要确保有序 并且要发到同一个队列下面去
    msgModels.forEach(msgModel -> {
        String msgModelString = msgModel.toString();
        Message message = new Message("orderlyTopic", msgModelString.getBytes());
        try {
            producer.send(message, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            // arg就是接收到的订单号 orderSn
                            // 在这里 选择队列
                            int hashCode = arg.toString().hashCode();
                            // 根据订单号hashCode,取模放到队列中,订单号一样,放到的队列一样
                            return mqs.get(hashCode % mqs.size());
                        }
                    },
                    // 传递订单号进去
                    msgModel.getOrderSn());

        } catch (Exception e) {
            e.printStackTrace();
        }
    });
    producer.shutdown();
    System.out.println("发送完成");
}

顺序消息消费者

java">@Test
public void orderlyConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("orderlyTopic", "*");
    // MessageListenerConcurrently 并发模式 多线程的  重试16次
    // MessageListenerOrderly 顺序模式 单线程的 无限重试Integer.Max_Value
    consumer.registerMessageListener(new MessageListenerOrderly() {
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            System.out.println("线程id:" + Thread.currentThread().getId());
            System.out.println(new String(msgs.get(0).getBody()));
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

下面的结果就是分区有序,同一个订单(同一队列)的消息会按照顺序消费,但是不同订单的消息没有顺序约束

在这里插入图片描述

RocketMQ发送批量消息

RocketMQ可以一次性发送一组消息,这一组消息被投递到同一个队列中,会被当做一条消息进行消费

批量消息生产者

java">@Test
public void testBatchProducer() throws Exception {
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-group");
    // 设置nameServer地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动实例
    producer.start();
    // 消息实际上还是分开发的,接收的时候还是一条一条接收就行
    List<Message> msgs = Arrays.asList(
            new Message("TopicTest", "我是一组消息的A消息".getBytes()),
            new Message("TopicTest", "我是一组消息的B消息".getBytes()),
            new Message("TopicTest", "我是一组消息的C消息".getBytes())
    );
    SendResult send = producer.send(msgs);
    System.out.println(send);
    // 关闭实例
    producer.shutdown();
}

三个消息放在一个队列中

在这里插入图片描述

批量消息消费者

消费者还是一条一条来消费

java">@Test
public void testBatchConsumer() throws Exception {
    // 创建默认消费者组
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
    // 设置nameServer地址
    consumer.setNamesrvAddr("localhost:9876");
    // 订阅一个主题来消费   表达式,默认是*
    consumer.subscribe("TopicTest", "*");
    // 注册一个消费监听 MessageListenerConcurrently是并发消费
    // 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                        ConsumeConcurrentlyContext context) {
            // 这里执行消费的代码 默认是多线程消费
            System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

RocketMQ发送事务消息(不够Seata方便)

事务消息的发送流程

它允许发送方在发送消息之前执行某些业务逻辑,并根据这些业务逻辑的结果来决定消息是否应该被发送。

  • 如果业务逻辑执行成功,则消息会被提交并最终被消费者消费;
  • 如果业务逻辑执行失败或不确定,则消息的状态将保持未知,直到通过回查机制确认其状态为止。

在这里插入图片描述

下图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

在这里插入图片描述

事务消息发送及提交

1、发送消息(half消息)

2、服务端响应消息写入结果(如果写入失败,此时half消息对业务不可见,本地事务不执行)

3、根据发送结果执行本地事务

4、根据本地事务状态执行Commit或Rollback(Commit操作生成消息索引,消息对消费者可见)

事务补偿

补偿阶段用于解决消息UNKNOW或者Rollback发生超时或者失败的情况

1、对没有Commit/Rollback的事务消息(pending状态的消息),发起一次“回查”

2、Producer收到回查消息,检查回查消息对应的本地事务的状态

3、根据本地事务状态,重新 Commit 或者 Rollback

事务消息状态

事务消息共有三种状态,提交状态、回滚状态、中间状态

  • TransactionStatus.CommitTransaction: 提交事务,允许消费者消费此消息
  • TransactionStatus.RollbackTransaction: 回滚事务,该消息将被删除,不允许被消费
  • TransactionStatus.Unknown: 中间状态,需要检查消息队列来确定状态

事务消息生产者

通过在broker.conf文件中设置如下参数

  • transactionCheckInterval:Broker检查事务消息状态的默认间隔时间(单位为毫秒)。默认Broker每1分钟(60000毫秒)会对未确认的事务消息进行一次状态检查
  • transactionTimeOut:事务消息的有效期,即事务消息在未得到确认前能存在的最长时间
  • transactionCheckMax:事务消息的最大检测次数(默认回查15次)。如果在达到最大检测次数后事务消息的状态仍未得到确认,Broker会默认认为事务已失败,并对消息进行回滚
java">private Random random = new Random();
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

/**
 * 创建订单,扣减库存
 *
 * @throws Exception
 */
@Test
public void createOrderAndDeductStock() throws Exception {
    // 构建消息体
    TransactionMQProducer producer = new TransactionMQProducer("async-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);

    // 设置事务消息的监听器
    producer.setTransactionListener(new TransactionListener() {
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            System.out.println(sdf.format(new Date()) + "-->执行本地事务:" + new String(msg.getBody()) + ";事务参数:" + arg);

            // 获取订单信息
            String orderInfo = new String(msg.getBody());

            // 做一些业务操作...

            // 检查订单状态
            if (isOrderValid(orderInfo)) {
                System.out.println("订单状态没有问题,消息发送给消费者处理");
                return LocalTransactionState.COMMIT_MESSAGE;
            } else {
                System.out.println("订单状态有问题,等会重新 检查本地事务");
                // 返回 UNKNOW ,后面会调用checkLocalTransaction(msg)方法
                return LocalTransactionState.UNKNOW;
            }
        }

        /**
         * 回查,确认上面的业务是否有结果
         * 触发条件:
         * 1、当上面执行本地事务返回结果 UNKNOW 时,或者回查方法也返回 UNKNOW 时,会触发
         * 2、上面操作超过 20s 没有做出一个结果,也就是超时或者卡住了,也会进行回查
         * @param messageExt
         * @return
         */
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
            System.out.println(sdf.format(new Date()) + "-->检查本地事务:" + new String(messageExt.getBody()));

            // 检查订单状态
            String orderInfo = new String(messageExt.getBody());
            if (isOrderValid(orderInfo)) {
                System.out.println("订单状态没有问题,消息发送给消费者处理");
                return LocalTransactionState.COMMIT_MESSAGE;
            } else {
                System.out.println("订单状态还是有问题,这个消息不要了");
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
    });

    producer.start();

    for (int i = 0; i < 3; i++) {
        String orderId = UUID.randomUUID().toString().replace("-", "");
        int amount = random.nextInt(10) + 1;
        // 构建订单信息
        String orderInfo = "订单号:" + orderId + " 数量:" + amount;

        // 尝试保存订单信息
        boolean saveOrderSuccess = saveOrder(orderInfo);

        // 发送事务消息
        Message message = new Message("Affair_Topic", orderInfo.getBytes());
        System.out.println(sdf.format(new Date()) + "-->发送事务消息:" + orderInfo);
        producer.sendMessageInTransaction(message, "hahaha");

    }
    System.in.read();
}


private boolean saveOrder(String orderInfo) {
    // 模拟保存订单信息到数据库
    // 假设保存成功
    return true;
}

private boolean isOrderValid(String orderInfo) {
    // 模拟检查订单状态
    // 假设订单有效
    int random = this.random.nextInt(2);
    return random == 1; // 应该替换为实际的订单状态检查逻辑
}

事务消息消费者

java">@Test
public void testTransactionConsumer() throws Exception {
    // 创建默认消费者组
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
    // 设置nameServer地址
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    // 订阅一个主题来消费   *表示没有过滤参数 表示这个主题的任何消息
    consumer.subscribe("Affair_Topic", "*");
    // 注册一个消费监听 MessageListenerConcurrently是并发消费
    // 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            // 这里执行消费的代码 默认是多线程消费
            System.out.println(sdf.format(new Date()) + "-->" + Thread.currentThread().getName() + " 执行消费:" + new String(msgs.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

测试结果

在这里插入图片描述

RocketMQ发送带标签的消息*(消息过滤)

  • RocketMQ提供消息过滤功能,通过tag或者key进行区分。同一个主题对应多个tag。我们往一个主题里面发送消息的时候,根据业务逻辑,可能需要区分,比如带有tagA标签的被A消费,带有tagB标签的被B消费,通过过滤来区别对待
  • 场景:订单主题中。有的消息是关于生鲜,需要及时配送;有的消息是关于服装,可以慢一点发货。不同标签的处理逻辑不同

在这里插入图片描述

订阅关系一致

  • 订阅关系:一个消费者组订阅一个 Topic 的某一个 Tag(个人实测,一个标签最好对应一个标签,不然信息会被过滤掉,不被正常消费),这种记录被称为订阅关系。
  • 订阅关系一致:同一个消费者组下所有消费者实例所订阅的 Topic、Tag 必须完全一致。如果订阅关系(消费者组名 - Topic - Tag)不一致,会导致消费消息紊乱,甚至消息丢失。

在这里插入图片描述

标签消息生产者

java">@Test
public void tagProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("tag-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();
    Message message = new Message("tagTopic", "vip1", "我是vip1的文章".getBytes());
    Message message2 = new Message("tagTopic", "vip2", "我是vip2的文章".getBytes());
    producer.send(message);
    producer.send(message2);
    System.out.println("发送成功");
    producer.shutdown();
}

标签消息消费者

java">/**
 * vip1
 *
 * @throws Exception
 */
@Test
public void tagConsumer1() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    // 主题,标签(写 * 是所有标签)
    consumer.subscribe("tagTopic", "vip1");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println("我是vip1的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}


/**
 * vip1 || vip2
 *
 * @throws Exception
 */
@Test
public void tagConsumer2() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-b");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("tagTopic", "vip1 || vip2");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println("我是vip2的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

Topic 和 Tag 的应用推荐(官方推荐)

总结:不同的业务应该使用不同的Topic,如果是相同的业务里面有不同的表现形式,我们要使用tag进行区分,可以从以下几个方面进行判断:

1、消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分

2、业务是否相关联:淘宝交易消息、京东物流消息使用不同的 Topic 进行区分;同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分

3、消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分

4、消息量级是否相当:有些业务消息虽然量小但是实时性要求高(如生鲜订单),如果跟某些万亿量级的消息使用同一个 Topic,可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic

总的来说,针对消息分类,您可以选择创建多个 Topic,或者在同一个 Topic 下创建多个 Tag。但通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息,例如全集和子集的关系、流程先后的关系。

发送消息携带自定义Key

在 RocketMQ 中的消息,默认会有一个messageId当做消息的唯一标识,我们也可以给消息携带一个key,用作唯一标识或者业务标识,包括在控制面板查询的时候也可以使用messageId或key来进行查询

在这里插入图片描述

携带Key好处

  • 方便查阅
  • 方便去重

携带 key 消息生产者

java">@Test
public void keyProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("key-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();
    String key = UUID.randomUUID().toString();
    System.out.println(key);
    Message message = new Message("keyTopic", "vip1", key, "我是vip1的文章".getBytes());
    producer.send(message);
    System.out.println("发送成功");
    producer.shutdown();
}

携带 key 消息消费者

java">@Test
public void keyConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("key-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("keyTopic", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            MessageExt messageExt = msgs.get(0);
            System.out.println("我是vip1的消费者,我正在消费消息" + new String(messageExt.getBody()));
            // 获取key
            System.out.println("我们业务的标识:" + messageExt.getKeys());
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

在这里插入图片描述


http://www.niftyadmin.cn/n/5682602.html

相关文章

一键自动化配置OpenHarmony编译环境

一、概述 本工程旨在对Ubuntu一键初始化配置环境&#xff0c;解决OpenHarmony的编译依赖问题&#xff0c;基于本脚本配置后配合一键下载OpenHarmony代码便能轻松掌控OpenHarmony的下载、编译。 当前建议使用稳定分支Itopen-2.0-Release&#xff0c;该分支是经过多次测试OK的&…

探索光耦:光耦在电脑电源中的应用及其重要性

随着计算机技术的飞速发展&#xff0c;电脑已成为现代生活和工作中不可或缺的工具。无论是日常办公、游戏娱乐还是复杂的图像处理&#xff0c;电脑电源的稳定性和安全性都至关重要。作为电脑电源的核心部件之一&#xff0c;光耦&#xff08;光电耦合器&#xff09;在提升电源性…

论文笔记:iCaRL: Incremental Classifier and Representation Learning

1. Contribution 提出了一种新的训练策略&#xff0c;iCaRL&#xff1a;允许以增量方式学习&#xff1a;只需要同时存在一小部分类别的训练数据&#xff0c;新类别可以逐步添加。同时学习分类器和数据表示&#xff1a;iCaRL能够同时学习强大的分类器和数据表示&#xff0c;这与…

grafana加载缓慢解决方案

背景 目前随着数据和图表的逐渐增多&#xff0c;Grafana 页面加载速度明显变慢&#xff0c;严重影响了用户体验&#xff0c;几次都有骂娘的冲动.&#xff0c;因此我们需要对 Grafana 进行优化&#xff0c;以提升加载性能。 对于速度优化&#xff0c;我们可以从以下方面进行入…

Scala第一天

1、 Scala 学习 在bg中创建Scala模块&#xff0c;导入3个org.scala-lang依赖 和 1个Java Compiler&#xff08;Java 编译的插件&#xff09; 、1个Scala Compiler&#xff08;Scala 编译的插件&#xff09; (1) java类是class定义的&#xff0c;scala是object定义的 def : 定义…

智能硬件语音交互接入大模型知识库的排错指引

前言 前篇讲了把大模型知识库接入到聆思CSK6大模型开发板的文章&#xff0c;这篇讲一下配置失败时该怎么去定位问题和解决。 阅读这篇文章前建议先看&#xff1a;三步把知识库接到智能语音硬件上-CSDN博客 一、排错流程顺序参考 二、云端鉴权问题处理 原因1&#xff1a;聆思平…

【优选算法】(第八篇)

目录 串联所有单词的⼦串&#xff08;hard&#xff09; 题目解析 讲解算法原理 编写代码 最⼩覆盖⼦串&#xff08;hard&#xff09; 题目解析 讲解算法原理 编写代码 串联所有单词的⼦串&#xff08;hard&#xff09; 题目解析 1.题目链接&#xff1a;. - 力扣&#…

docker pull报错:dial tcp: no such host

有一段时间没用docker了&#xff0c;今天使用docker下载镜像竟然报错&#xff0c;而且是莫名其妙的错误&#xff0c;奔走相告&#xff0c;避免后来者踩坑&#xff01; Error response from daemon: Get "https://mirror.aliyuncs.com/v2/": dial tcp: lookup mirror…