本文介绍: 比如网络带宽为可以支持一次性发送8M的数据包,如果数据包确定不会超过8M,那么我们可以除以每条消息的大小(粗略估算),然后会得到一个数值,这个数值再取70%-80%留一定的缓冲空间。发送消息是需要网络连接的如果我们单条发送吞吐量可能没有批量发送好。剖来那个发送可以减少网络IO开销,但是也不能一批次发送太多的数据,需要根据每条消息的大小和网络带宽来确定量的数目。如果我们一次性发送的数据超过了8M,就需要对这些消息进行分组发送,保证每一组的数据大小不超过8M,每一组发送的数量逻辑也是按照前面这样来计算。

批量收发实战

发送消息是需要网络连接的如果我们单条发送吞吐量可能没有批量发送好。剖来那个发送可以减少网络IO开销,但是也不能一批次发送太多的数据,需要根据每条消息的大小和网络带宽来确定量的数目。
比如网络带宽为可以支持一次性发送8M的数据包,如果数据包确定不会超过8M,那么我们可以除以每条消息的大小(粗略估算),然后会得到一个数值,这个数值再取70%-80%留一定的缓冲空间。
如果我们一次性发送的数据超过了8M,就需要对这些消息进行分组发送,保证每一组的数据大小不超过8M,每一组发送的数量逻辑也是按照前面这样来计算。
在这里插入图片描述
生产者

public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr("ip:9876");
        producer.start();
        List<Order> F = OrderBuilder.build(1, "A", "B", "C");
        List<Order> S = OrderBuilder.build(2, "D", "Q");
        List<Order> T = OrderBuilder.build(3, "N", "Q", "R");
        ArrayList<Order> orders = new ArrayList<Order>() {{
            addAll(F);
            addAll(S);
            addAll(T);
        }};
        List<Message> msgs = new ArrayList<>();
        for (Order order : orders) {
            Message msg = new Message("test-topic", "test-topic_str", order.toString().getBytes());
            msg.setKeys("test-topic_trace");
            msgs.add(msg);
        }
        producer.send(msgs);
    }
}

消费者1

public class Consumer {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
        consumer.setNamesrvAddr("ip:9876");
        consumer.subscribe("test-topic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List< MessageExt > msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

可以看到批量消费的时候没有保证顺序:
在这里插入图片描述

消费者2

public class Consumer2 {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-group");
        consumer.setNamesrvAddr("ip:9876");
        consumer.subscribe("test-topic", "*");
        // 使用顺序的方式来消费MessageListenerOrderly
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {
                    System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
    }
}

消费的时候没有产生顺序问题,完全是按照批量发送的顺序:
在这里插入图片描述

原文地址:https://blog.csdn.net/qq_43259860/article/details/135466638

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。

如若转载,请注明出处:http://www.7code.cn/show_56286.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注