本文介绍: 首先,需要在Flutter项目的​​pubspec.yaml​​​文件添加​​mqtt_client​​依赖。其中host主机名port端口号cid客户端ID,你可以根据需要为其分配一个唯一标识。一旦连接到MQTT服务器,你可以订阅感兴趣主题接收消息使用MQTT客户端发布消息到特定的主题。​​​来构建消息有效载荷,然后使用​​。//用于监听订阅主题的消息到达。方法发布消息到指定主题方法订阅一个主题,并使用。在上面的代码中,使用​​。流来监听收到的消息。

1.添加依赖:

首先,需要在Flutter项目的​​pubspec.yaml​​​文件添加​​mqtt_client​​依赖

dependencies:
  #https://pub.dev/packages/mqtt_client
  mqtt_client: ^10.0.02.创建MQTT客户端连接到MQTT服务器:

2.创建一个MQTT客户实例来进行连接通信

Future<MqttServerClient&gt; connect(String cid) async {

    print('mqtt connect host = $host cid = $cid ');
    MqttServerClient client =
    MqttServerClient.withPort(host, cid, port);
    client.logging(on: true);
    client.onConnected = onConnected;
    client.onDisconnected = onDisconnected;
    client.onUnsubscribed = onUnsubscribed;
    client.onSubscribed = onSubscribed;
    client.onSubscribeFail = onSubscribeFail;
    client.pongCallback = pong;

    final connMessage = MqttConnectMessage()
        .authenticateAs(user, pwd)
        .keepAliveFor(60)// 保持连接时间单位为秒
        .withWillTopic('willtopic')
        .withWillMessage('Will message')
        .startClean()// 清理会话
        .withWillQos(MqttQos.atLeastOnce);
    client.connectionMessage = connMessage;
    try {
      await client.connect();
    } catch (e) {
      print('Exception: $e');
      client.disconnect();
    }

    return client;
  }

其中host 是主机名,port端口号cid是客户端ID,你可以根据需要为其分配一个唯一的标识。

3.订阅主题:

一旦连接到MQTT服务器,你可以订阅感兴趣的主题以接收消息。以下是订阅主题的示例代码
//用于监听订阅主题的消息到达。

    client.updates?.listen((List<MqttReceivedMessage<MqttMessage>> c) {
      final MqttPublishMessage recMess = c[0].payload as MqttPublishMessage;
      final String pt = MqttPublishPayload.bytesToStringAsString(recMess.payload.message);
      // 解码包含中文字符字符串
      final String decodedString = utf8.decode(pt.codeUnits);

      LogI('Received message: $decodedString from topic: ${c[0].topic}');
    });

通过​​client.subscribe​​​方法订阅一个主题,并使用​​client.updates​​流来监听收到的消息。

4.发布消息:

使用MQTT客户端来发布消息到特定的主题。以下是发布消息的示例代码

final MqttClientPayloadBuilder builder = MqttClientPayloadBuilder();
builder.addString('Hello from Flutter');

client.publishMessage('your_topic', MqttQos.exactlyOnce, builder.payload);

在上面的代码中,使用​​MqttClientPayloadBuilder​​​来构建消息的有效载荷,然后使用​​client.publishMessage​​方法来发布消息到指定的主题。

5.断开连接:

当你不再需要与MQTT服务器通信时,记得断开连接以释放资源

client.disconnect();

完整代码

class XMqttClient {

  static final XMqttClient _instance = XMqttClient._();

  static XMqttClient get instance => _instance;

  static const host = '139.196.xx.xx';//替换成你自己主机

  static const port = 1883;//端口号

  static const user = 'admin';//用户

  static const pwd = 'public';//密码

  List<String> topics = [];

  MqttClient? client;

  XMqttClient._() {
    _initMqtt();
  }


  _initMqtt() async {
    //clientld 确保唯一性,否则如果两台机器的clientld 相同 则会连上立刻断开连接!!!
    String clientId = '${DateTime.now().millisecondsSinceEpoch}asc';

    client = await connect(clientId);
  }

  Future<MqttServerClient> connect(String cid) async {

    print('mqtt connect host = $host cid = $cid ');
    MqttServerClient client =
    MqttServerClient.withPort(host, cid, port);
    client.logging(on: true);
    client.onConnected = onConnected;
    client.onDisconnected = onDisconnected;
    client.onUnsubscribed = onUnsubscribed;
    client.onSubscribed = onSubscribed;
    client.onSubscribeFail = onSubscribeFail;
    client.pongCallback = pong;

    final connMessage = MqttConnectMessage()
        .authenticateAs(user, pwd)
        .keepAliveFor(60)// 保持连接时间单位为秒
        .withWillTopic('willtopic')
        .withWillMessage('Will message')
        .startClean()// 清理会话
        .withWillQos(MqttQos.atLeastOnce);
    client.connectionMessage = connMessage;
    try {
      await client.connect();
    } catch (e) {
      print('Exception: $e');
      client.disconnect();
    }

    //用于监听已订阅主题的消息到达。
    client.updates?.listen((List<MqttReceivedMessage<MqttMessage>> c) {
      final MqttPublishMessage recMess = c[0].payload as MqttPublishMessage;
      final String pt = MqttPublishPayload.bytesToStringAsString(recMess.payload.message);
      // 解码包含中文字符字符串
      final String decodedString = utf8.decode(pt.codeUnits);

      LogI('Received message: $decodedString from topic: ${c[0].topic}');
    });

    return client;
  }
  

  ///订阅一个主题
  _subscribe(String topic) {
    client?.subscribe(topic, MqttQos.atLeastOnce);
  }

  ///订阅多个主题
  topicSubscribe(List<String> topics) async {
    this.topics.addAll(topics);

    if (client?.connectionStatus?.state == MqttConnectionState.connected) {
      topics.forEach((topic) {
        _subscribe(topic);
      });
    } else {
      //未连接成功 每隔3s重新订阅
      Future.delayed(const Duration(seconds: 3), () {
        topicSubscribe(topics);
      });
    }
  }

  ///取消订阅
  _unsubscribe() {
    client?.unsubscribe('topic/test');
  }

  ///断开连接
  _disconnect() {
    client?.disconnect();
  }


  // 连接成功
  void onConnected() {
    print('连接成功');
  }

// 连接断开
  void onDisconnected() {
    print('连接断开');
  }

// 订阅主题成功
  void onSubscribed(String topic) {
    print('订阅主题成功: $topic');
  }

// 订阅主题失败
  void onSubscribeFail(String topic) {
    print('订阅主题失败 $topic');
  }

// 成功取消订阅
  void onUnsubscribed(String? topic) {
    print('成功取消订阅: $topic');
  }

// 收到 PING 响应
  void pong() {
    print('收到 PING 响应 Ping response client callback invoked');
  }


}

原文地址:https://blog.csdn.net/WriteBug001/article/details/134697846

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_39118.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注