1.引入相关插件

  # websocket
  web_socket_channel: ^2.4.0
  # 引入rxdart 解决Bad state: Stream has already been listened to.的报错问题
  rxdart: ^0.27.7
  # 状态管理*
  provider: ^6.0.5

2.代码编写封装

import 'dart:async';

import 'package:rxdart/subjects.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/status.dart' as status;

typedef WebsocketMessageCallback = void Function(dynamic message);

/// 注册控制器需要哪些页面使用
///
/// 目前分三种类型:
///
/// 1.[customerLoginPage]游客模式下也就是在未登录时候用户处于登录相关页面)
/// 2.[customerMainPage]用户登录,处于主页及其他登录后的页面下
/// 3.[chatRoomPage]用户处在聊天室里(游客下的在线客服聊天室用户登录下的在线客服聊天室买卖用户之间聊天室enum StreamControllerNameEnum {
  customerLoginPage,
  customerMainPage,
  chatRoomPage;
}

class WebsocketHelper {
  WebsocketHelper._();

  static WebsocketHelper? _singleton;

  factory WebsocketHelper() => _singleton ??= WebsocketHelper._();

/// 用于连接websocket的链接uri
  Uri? wsUri;

/// websocket连接后的对象
  WebSocketChannel? _webSocketChannel;

/// 指定stream控制器存放map
  Map<String, BehaviorSubject<String>>? streamControllerList;

/// 是否开启心跳
  bool isOpenHeartBeat = true;

/// 用于控制心跳轮询
  StreamSubscription<String>? _subscription;

/// 是否是用户主动触发关闭连接
  bool isDisconnectByUser = false;

/// 另辟一个单独的消息回调函数
  WebsocketMessageCallback? messageCallback;

/// 连接断开回调
  Function()? onDone;

/// 连接出错回调
  Function? onError;

  /// step one - ex: ws://localhost:1234
  initSocket({required String wsPath, bool isOpenHeartBeat = true}) {
    if (_webSocketChannel != null) {
      print("socket实例存在, 请勿重复创建");
      return;
    }

    // 自己项目中后端需要前端一个登录令牌用于控制后端逻辑处理这里使用的是登录后的token
    var authorization = "登录后的token";
    wsUri = Uri.tryParse("$wsPath?Authorization=$authorization");
    // wsUri = Uri.tryParse(wsPath);
    if (wsUri == null) return;
    this.isOpenHeartBeat = isOpenHeartBeat;
    _connectWebsocket();
  }

  /// [isRunForReConnect] 是否是由重连机制触发的此方法
  void _connectWebsocket({bool isRunForReConnect = false}) {
    _webSocketChannel = WebSocketChannel.connect(wsUri!);
    if (!isRunForReConnect) {
      isDisconnectByUser = false;
    }
  }

  /// step two - listen
  void listen(
      {WebsocketMessageCallback? messageCallback,
      Function()? onDone,
      Function? onError}) {
    this.messageCallback = messageCallback;
    this.onDone = onDone;
    this.onError = onError;
    streamControllerList ??= <String, BehaviorSubject<String>>{
      // StreamControllerNameEnum.customerLoginPage.name: BehaviorSubject(),
      // StreamControllerNameEnum.customerMainPage.name: BehaviorSubject(),
      // StreamControllerNameEnum.chatRoomPage.name: BehaviorSubject()
    };

    // 监听系列连接情况(如收到消息、onDone:连接关闭、onError:接连异常)
    _webSocketChannel?.stream.listen((message) {
      print(
          "websocket onData message = ${message.toString()}, type = ${message.runtimeType}");
      if (message is String &amp;&amp; message.isEmpty) {
        // 消息为空可能得情况:心跳 or anotherreturn;
      }
      // 通过控制器把消息分发出去,在需要页面监听此流的消息
      streamControllerList?.forEach((key, value) {
        // print("key = $key, value.isClosed = ${value.isClosed}");
        if (!value.isClosed) {
          value.sink.add(message);
        }
      });
      this.messageCallback?.call(message);
    }, onDone: () {
      print("websocket onDone ...");
      this.onDone?.call();
      // 掉线重连
      reConnect();
    }, onError: (Object error, StackTrace stackTrace) {
      print(
          "websocket onError error = ${error.toString()}, stackTrace = ${stackTrace.toString()}");
      showToast(msg: "连接服务器失败!");
      this.onError?.call(error, stackTrace);
    }, cancelOnError: false);
    // 连接建立成功时的回调通知,可在此做心跳操作
    _webSocketChannel?.ready.then((value) {
      print("webSocket ready");
      isDisconnectByUser = false;
      if (isOpenHeartBeat) {
        // 收到连接成功的回馈,开始执行心跳操作
        _startHeartBeat();
      }
    });
  }

  /// 掉线重连
  void reConnect() {
    if (isDisconnectByUser) return;
    Future.delayed(
      const Duration(seconds: 3),
      () {
        // disconnect();
        _subscription?.cancel();
        _subscription = null;
        _webSocketChannel?.sink.close(status.abnormalClosure, "掉线重连");
        _webSocketChannel = null;
        _connectWebsocket();
        listen(
            messageCallback: messageCallback, onDone: onDone, onError: onError);
      },
    );
  }

/// 发送消息
  void sendMessage({required String message, bool needDisplayMsg = true}) {
    print("websocket sendMessage message = $message");
    if (needDisplayMsg) {
      streamControllerList?.forEach((key, value) {
        if (!value.isClosed) {
          value.sink.add(message);
        }
      });
    }

    _webSocketChannel?.sink.add(message);
  }

/// 开启心跳
  void _startHeartBeat() {
    if (_subscription != null) {
      print("websocket startHeartBeat _subscription != null");
      return;
    }
    Future.delayed(
      const Duration(seconds: 30),
      () {
        var pollingStream = StreamTool().timedPolling(
            const Duration(seconds: 30), () => Future(() => ""), 100000000);
        //进行流内容监听
        _subscription = pollingStream.listen((result) {
          sendMessage(message: "heart beat", needDisplayMsg: false);
        });
      },
    );
  }

/// 断开连接并销毁
  void disconnect({bool isDisconnectByUser = false}) {
    this.isDisconnectByUser = isDisconnectByUser;
    _subscription?.cancel();
    _subscription = null;
    streamControllerList?.forEach((key, value) {
      value.close();
    });
    streamControllerList?.clear();
    _webSocketChannel?.sink.close(status.normalClosure, "用户退出聊天界面聊天关闭");
    _webSocketChannel = null;
  }

/// 新建指定stream流控制器进行消息流回调
  setNewStreamController(StreamControllerNameEnum streamControllerName) {
    if (streamControllerList?.containsKey(streamControllerName.name) ?? false) {
      streamControllerList?[streamControllerName.name]?.close();
    }
    streamControllerList?[streamControllerName.name] = BehaviorSubject();
  }
}

3.提供一个轮询工具类StreamTool

import 'dart:async';

typedef FutureGenerator<T> = Future<T> Function();

class StreamTool {
  /// interval 轮询时间间隔
  /// maxCount 最大轮询数
  Stream<T> timedPolling<T>(Duration interval, FutureGenerator<T> future,
      [int maxCount = 1]) {
    StreamController<T>? controller;
    int counter = 0;
    bool polling = true;

    void stopTimer() {
      polling = false;
    }

    void tick() async {
      counter++;
      T result = await future();
      if (controller != null &amp;&amp; !controller.isClosed) {
        controller.add(result);
      }
      if (counter == maxCount) {
        stopTimer();
        controller?.close();
      } else if (polling) {
        Future.delayed(interval, tick);
      }
    }

    void startTimer() {
      polling = true;
      tick();
    }

    //StreamSubscription调用pausecancel时,stream里面轮询也能响应暂停或取消
    controller = StreamController<T>(
      onListen: startTimer,
      onPause: stopTimer,
      onResume: startTimer,
      onCancel: stopTimer,
    );

    return controller.stream;
  }
}

4.新建全局的ChangeNotifier -> GlobalWebsocketVM

class GlobalWebsocketVM extends ChangeNotifier {
void startWebSocket() {
    WebsocketHelper()
      ..initSocket(wsPath: Api.wsUrlPath, isOpenHeartBeat: false)
      ..listen(
        messageCallback: (message) {
          // 延迟500毫秒,使listview进行滑动底部
         // gotoListBottom();
        },
        onDone: () {},
      );
  }

 /// 获取socket实时数据流
  ///
  /// 每次都需要绑定一个StreamController,避免数据流出现错乱情况
  Stream<String>? getMessageStream(
          StreamControllerNameEnum streamControllerName) =>
      (WebsocketHelper()..setNewStreamController(streamControllerName))
          .streamControllerList?[streamControllerName.name]
          ?.stream;
}

5.在入口类main.dart中MaterialApp中使用全局GlobalWebsocketVM

late GlobalWebsocketVM socketVM;

@override
void initState() {
  socketVM = GlobalWebsocketVM();
}

MaterialApp.router(
      debugShowCheckedModeBanner: false,
      onGenerateTitle: (context) => S.current.appName,
      theme: ThemeData(
        useMaterial3: true,
        colorScheme: ColorScheme.fromSeed(seedColor: Colors.white),
        appBarTheme: const AppBarTheme(
            color: Colors.white, surfaceTintColor: Colors.white),
        bottomAppBarTheme: BottomAppBarTheme.of(context)
            .copyWith(color: Colors.white, surfaceTintColor: Colors.white),
        scaffoldBackgroundColor: Colors.grey[200],
        cardTheme: const CardTheme(
            color: Colors.white, surfaceTintColor: Colors.white),
        progressIndicatorTheme:
            const ProgressIndicatorThemeData(color: AppColor.appThemeColor),
        // 统一修改输入框光标颜色文本选中颜色
        textSelectionTheme: const TextSelectionThemeData(
          cursorColor: AppColor.appThemeColor,
          selectionColor: AppColor.appThemeColor,
          selectionHandleColor: AppColor.appThemeColor,
        ),
        // ios主题设置
        cupertinoOverrideTheme:
            const CupertinoThemeData(primaryColor: AppColor.appThemeColor),
        iconButtonTheme: IconButtonThemeData(
          style: AppButtonStyle.stGlobalDefaultBtn,
        ),
        textButtonTheme: TextButtonThemeData(
          style: AppButtonStyle.stGlobalDefaultBtn,
        ),
        // primarySwatch: themeVM.theme,
      ),
      // locale: localeVM.getLocale(),
      builder: FlutterSmartDialog.init(
        builder: (context, mChild) {
          return MultiProvider(
            providers: [
              ChangeNotifierProvider<UserVM>(
                create: (_) => userVM,
              ),
              // 在这里每个页面添加GlobalWebsocketVM绑定
              ChangeNotifierProvider<GlobalWebsocketVM>(
                create: (_) => socketVM,
              ),
            ],
            builder: (context, child) => mChild ?? const SizedBox.shrink(),
          );
          /*return ChangeNotifierProvider<UserVM>(
            create: (_) => userVM,
            builder: (context, child) =>
            mChild ?? const SizedBox.shrink(),
          );*/
        },
      ),
      localizationsDelegates: const [
        S.delegate,
        GlobalMaterialLocalizations.delegate,
        GlobalWidgetsLocalizations.delegate,
        GlobalCupertinoLocalizations.delegate,
      ],
      supportedLocales: S.delegate.supportedLocales,
      // localeResolutionCallback: (_locale, supportedLocales) {
      //   if (localeVM.getLocale() != null) {
      //     //如果已经选定语言,则不跟随系统
      //     return localeVM.getLocale();
      //   } else {
      //     //跟随系统
      //     Locale locale;
      //     if (supportedLocales.contains(_locale)) {
      //       locale = _locale!;
      //     } else {
      //       //如果系统语言不是中文简体或美国英语,则默认使用美国英语
      //       locale = const Locale('en', 'US');
      //     }
      //     return locale;
      //   }
      // },
      routerConfig: RouterHelper.router,
    );

6.页面调用,在initState方法中建立连接,在build中使用StreamBuilder进行消息监听

@override
  void initState() {
    super.initState();
    // 建立连接ws global init
    context.read<GlobalWebsocketVM>().startWebSocket();
    
  }

@override
  Widget build(BuildContext context) {
    return Scaffold(
            resizeToAvoidBottomInset: false,
            appBar: TitleBar.build(title: "正与${model.titleContent}沟通"),
            // 监听聊天消息并刷新聊天列表
            body: StreamBuilder<String>(
          stream: context
        .read<GlobalWebsocketVM>()
        .getMessageStream(StreamControllerNameEnum.chatRoomPage),
          builder: (context, snapshot) {
            if (snapshot.connectionState == ConnectionState.active) {
              if (snapshot.data?.isEmpty ?? true) {
                return const SizedBox.shrink();
              }
              addMessageAndRefreshUI("orderNo", snapshot.data!);
              return const SizedBox.shrink();
            }
            return const SizedBox.shrink();
          },
          // catchError: (context, error) => error.toString(),
        ),,
          );
}

void addMessageAndRefreshUI(String tag, String message) {
    print("收到聊天消息:" + message);
}

7.发送消息

/// 在合适的地方(比如发送按钮点击发送聊天消息)
void sendChatMessage() {
WebsocketHelper().sendMessage(
      message: "我发送一条消息",
      needDisplayMsg: false,
    );
}

8.退出app断开websocket清理内存可以在任何想断开websocket的地方调用销毁

/// 通常在dispose调用销毁可以在任何想断开websocket的地方调用销毁
@override
  void dispose() {
    ScanHelper().dispose();
    WebsocketHelper().disconnect(isDisconnectByUser: true);
    super.dispose();
  }

原文地址:https://blog.csdn.net/qq_23404533/article/details/134737579

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_37660.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注