AsyncSequence并发框架SE-298 提案的一部分。它的名字意味着它是一个提供异步顺序迭代访问元素类型。换句话说:它是我们在 Swift 中熟悉的常规序列一个异步变体

就像你不会经常创建你的自定义序列一样,我不期望你经常创建一个自定义AsyncSequence 实现。然而,由于与 AsyncThrowingStream和AsyncStream 等类型一起使用,你很可能不得不与异步序列一起工作。因此,我将指导你使用 AsyncSequence 实例进行工作


什么是 AsyncSequence?

AsyncSequence我们在Swift中熟悉的 Sequence一个异步变体。由于它的异步性,我们需要使用 await 关键字,因为我们处理的是异步定义方法。如果你没有使用async/await,我鼓励你阅读我的文章Swift 中的async/await ——代码实例详解

可以随着时间的推移而变得可用,这意味着一个 AsyncSequence 在你第一次使用它时可能不包含也可能包含一些,或者全部的值。

重要的是要理解 AsyncSequence 只是一个协议。它定义如何访问值,但并不产生或包含值。AsyncSequence 协议实现者提供了一个 AsyncIterator,并负责开发潜在存储值。

Function Note
contains(_ value: Element) async rethrows -> Bool Requires Equatable element
contains(where: (Element) async throws -> Bool) async rethrows -> Bool The async on the closure allows optional async behavior, but does not require it
allSatisfy(_ predicate: (Element) async throws -> Bool) async rethrows -> Bool
first(where: (Element) async throws -> Bool) async rethrows -> Element?
min() async rethrows -> Element? Requires Comparable element
min(by: (Element, Element) async throws -> Bool) async rethrows -> Element?
max() async rethrows -> Element? Requires Comparable element
max(by: (Element, Element) async throws -> Bool) async rethrows -> Element?
reduce<T>(_ initialResult: T, _ nextPartialResult: (T, Element) async throws -> T) async rethrows -> T
reduce<T>(into initialResult: T, _ updateAccumulatingResult: (inout T, Element) async throws -> ()) async rethrows -> T

对于这些函数我们首先定义一个符合 AsyncSequence 协议的类型。该名称是模仿现有标准库“序列”类型,如 LazyDropWhileCollectionLazyMapSequence然后我们AsyncSequence扩展中添加一函数,该函数创建新类型(使用’ self ‘作为’ upstream ‘)并返回它。

map<T>(_ transform: (Element) async throws -> T) -> AsyncMapSequence
compactMap<T>(_ transform: (Element) async throws -> T?) -> AsyncCompactMapSequence
flatMap<SegmentOfResult: AsyncSequence>(_ transform: (Element) async throws -> SegmentOfResult) async rethrows -> AsyncFlatMapSequence
drop(while: (Element) async throws -> Bool) async rethrows -> AsyncDropWhileSequence
dropFirst(_ n: Int) async rethrows -> AsyncDropFirstSequence
prefix(while: (Element) async throws -> Bool) async rethrows -> AsyncPrefixWhileSequence
prefix(_ n: Int) async rethrows -> AsyncPrefixSequence
filter(_ predicate: (Element) async throws -> Bool) async rethrows -> AsyncFilterSequence

创建 AsyncSequence

创建一个自定义的 AsyncSequence。

为了更好理解 AsyncSequence如何工作的,我将演示一个实现实例。然而,在定义你的 AsyncSequence自定义实现时,你可能想用 AsyncStream 来代替,因为它的设置更方便。因此,这只是一个代码例子,以更好理解 AsyncSequence工作原理


struct Counter: AsyncSequence {
    typealias Element = Int

    let limit: Int

    struct AsyncIterator : AsyncIteratorProtocol {
        let limit: Int
        var current = 1
        mutating func next() async -> Int? {
            guard !Task.isCancelled else {
                return nil

            guard current <= limit else {
                return nil

            let result = current
            current += 1
            return result

    func makeAsyncIterator() -> AsyncIterator {
        return AsyncIterator(howHigh: limit)

如您所见,我们定义了一个实现 AsyncSequence 协议的 Counter 结构体。该协议要求我们返回一个自定义AsyncIterator我们使用内部类型解决了这个问题我们可以决定重写示例消除内部类型的需求

struct Counter: AsyncSequence, AsyncIteratorProtocol {
    typealias Element = Int

    let limit: Int
    var current = 1

    mutating func next() async -> Int? {
        guard !Task.isCancelled else {
            return nil

        guard current <= limit else {
            return nil

        let result = current
        current += 1
        return result

    func makeAsyncIterator() -> Counter {

我们现在可以self 作为迭代器返回,并保持所有逻辑的集中。

注意,我们必须通过提供 typealias 来帮助编译器遵守 AsyncSequence 协议。

next() 方法负责整体数值进行迭代。我们的例子归结为提供尽可能多的计数值,直到我们达到极限。我们通过Task.isCancelled检查实现取消支持



现在我们知道什么AsyncSequence 以及它是如何实现的,现在是时候开始迭代这些值了。

以上述例子为例,我们可以使用 Counter 开始迭代

for await count in Counter(limit: 5) {
print("Counter finished")

// Prints:
// 1
// 2
// 3
// 4
// 5
// Counter finished

我们必须使用 await 关键字,因为我们可能会异步接收数值。一旦不再有预期的值,我们就退出for循环。异步序列的实现可以通过next() 方法中返回 nil表示达到极限。在我们的例子中,一旦计数器达到配置的极限,或者迭代取消,我们就会达到这个预期:

mutating func next() async -> Int? {
    guard !Task.isCancelled else {
        return nil

    guard current <= limit else {
        return nil

    let result = current
    current += 1
    return result



for await count in Counter(limit: 5).filter({ $0 % 2 == 0 }) {
print("Counter finished")

// Prints: 
// 2
// 4
// Counter finished

或者我们可以在迭代之前将计数映射为一个 String

let counterStream = Counter(limit: 5)
    .map { $0 % 2 == 0 ? "Even" : "Odd" }
for await count in counterStream {
print("Counter finished")

// Prints:
// Odd
// Even
// Odd
// Even
// Odd
// Counter finished

我们甚至可以使用 AsyncSequence 而不使用for循环通过使用 contains方法

let contains = await Counter(limit: 5).contains(3)
print(contains) // Prints: true

注意,上述方法是异步的,意味着它有可能无休止地等待一个值的存在,直到底层AsyncSequence 完成。


AsyncSequence 是我们在Swift中熟悉的常规 Sequence 的异步替代品。就像你不会经常自己创建一个自定义 Sequence 一样,你也不太可能创建自定义的异步序列。






