rust tokio select!宏详解

简介

本文介绍Tokioselect!用法,重点是使用过程可能遇到的问题比如阻塞问题优先级问题cancel safe问题。在Tokio 中,select!一个宏,用于同时等待多个异步任务,并在其中任意一个任务完成时执行相应的逻辑

基本用法

如下代码演示如何使用 Tokio实现一个异步消息传递系统,其中包括三个无限通道一个关闭通道程序使用select! 宏来等待通道和关闭通道的事件,并在事件发生时执行相应的操作

程序的主要步骤如下

  1. 创建三个无限通道和一个用于传递关闭信号的通道。
  2. 向三个通道中发送一些数据
  3. 开启一个异步任务并在两秒后发送关闭信号
  4. 在主循环中使用 select! 宏等待通道和关闭通道的事件
  5. 当一个通道接收数据时,打印数据
  6. 关闭通道接收信号时,退出循环

程序中的 select! 宏使用了类似于 match语法,但是它可以同时等待多个异步事件。当其中一个事件发生时,宏将执行相应的代码块,并跳出循环。在本例中,当一个通道接收数据时,打印数据;当关闭通道接收到信号时,退出循环
select!经常与loop搭配使用,循环地从多个通道中接收事件处理

use std::time::Duration;

use tokio::select;

#[tokio::main]
async fn main() {
    let (sender1, mut receiver1) = tokio::sync::mpsc::unbounded_channel::<String>();
    let (sender2, mut receiver2) = tokio::sync::mpsc::unbounded_channel::<String&gt;();
    let (sender3, mut receiver3) = tokio::sync::mpsc::unbounded_channel::<String&gt;();

    let (shutdown_sender, mut shutdown_receiver) = tokio::sync::watch::channel(());
    for i in 0..3 {
        sender1.send(i.to_string()).unwrap();
        sender2.send(i.to_string()).unwrap();
        sender3.send(i.to_string()).unwrap();
    }

    tokio::spawn(async move {
        tokio::time::sleep(Duration::from_secs(2)).await;
        shutdown_sender.send(()).unwrap(); //两秒后关闭
    });

    loop {
        select! {
            ret = receiver1.recv() =&gt; {
                println!("channel 1 received: {:?}", ret);
            },
            ret = receiver2.recv() =&gt; {
                println!("channel 2 received: {:?}", ret);
            },
            ret = receiver3.recv() =&gt; {
                println!("channel 3 received: {:?}", ret);
            },
            _ = shutdown_receiver.changed() =&gt; {
                println!("shutdown received");
                break;
            }
        };
    }
}

可能遇到的坑

阻塞

select中的各个分支并行执行的,这里并行是指分支中的各个future并行执行。不过一旦某个分支future完成并进入了分支代码块,如果在分支代码中有一些阻塞操作,则其他分支是没有机会执行的。
比如下面代码,在receiver1.recv()完成时,sleep了10s,sleep期间其他的分支是不会执行的。即使在2s发送shutdown信号select!因为无法及时处理信号,实际上循环也无法退出

 loop {
        select! {
            ret = receiver1.recv() => {
                println!("channel 1 received: {:?}", ret);
                tokio::time::sleep(Duration::from_secs(10)).await;//这里等待期间,其他的分支是无法被执行的
            },
            ret = receiver2.recv() => {
                println!("channel 2 received: {:?}", ret);
            },
            ret = receiver3.recv() => {
                println!("channel 3 received: {:?}", ret);
            },
            _ = shutdown_receiver.changed() => {
                println!("shutdown received");
                break;
            }
        };
    }

这个坑在网络编程比较容易踩到,比如select这里是从channel中取出上层应用传来的数据,并将其写入socket中,而写socket操作是有可能阻塞的,阻塞期间其他的分支是无法执行的。

顺序

1、默认情况下select中的各个分支执行顺序随机的,比如上面例子中三个channel都有消息的情况下,具体去执行哪个分支是随机的。执行结果如下
在这里插入图片描述
2、如果想要区分优先级可以加标志biased,这样每次select将会按照从上到下的顺序poll每个future,也就是说优先级顺序是从上往下的。比如某些场景需要优先级处理各个channel中的数据时这个特性就很有用。代码如下

    loop {
        select! {
            biased;//按顺序优先执行
            ret = receiver1.recv() => {
                println!("channel 1 received: {:?}", ret);
            },
            ret = receiver2.recv() => {
                println!("channel 2 received: {:?}", ret);
            },
            ret = receiver3.recv() => {
                println!("channel 3 received: {:?}", ret);
            },
            _ = shutdown_receiver.changed() => {
                println!("shutdown received");
                break;
            }
        };
    }

运行结果如下
在这里插入图片描述
3、顺序执行时注意饿死问题
添加biased标志后,顺序靠前的future总是先被执行,在上述例子中,极端情况下如果靠前的channel总是有数据,那后面的channel没有机会被执行。比如例子中如果前三个channel中一直有数据,那shutdown_receiver就无法收到shutdown信号,导致程序功能不符合预期。
解决这个问题很简单,就是把更关键的控制性的future放在最前方。

关于cancel safe

select!中如果某个分支future completed了,会将其他分支的future cancel掉,这个cancel操作要格外小心,因为如果future不是cancel safe可能会丢数据。tokio的官方文档中给出了常见cancel safe和不safefuture
那么如何判断自己实现future是否是cancel safe的呢? 很简单、只需要思考如果future中的代码执行.await时被cancel了,是否是安全的。我们来看下cancel unsafe的代码长啥样:

pub async fn read_and_write(mut message_recevier: UnboundedReceiver<Bytes>, mut file: File) {
    let message = message_recevier.recv().await.unwrap();
    file.write(&amp;message).await.unwrap();
}

方法从一个channel中读取消息,并将此消息写入文件中,这个future就明显不是cancel safe的。为啥呢?试想一下,此futurechannel中读到消息之后,在写文件时被cancel掉了,那message岂不是就丢了。
实际项目中一定要格外小心这个cancel safe问题,很容易造成丢数据或者数据重复等不良反应,而且一旦出现了还很难复现、不太容易想到是这里的问题。网络编程中尤其要注意tokio::io::AsyncWriteExt::write_all不是cancel safe的,因为它内部可能多次调用write操作才将所有缓冲区写入

数量

1、首先select!中的分支仅支持显式地用代码书写,无法动态增减。就是说在写代码时select中的futures数量就固定了,程序运行过程中无法动态删减
2、目前最多支持64个分支。

原文地址:https://blog.csdn.net/luchengtao11/article/details/134605619

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_29630.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注