RxSwift订阅过程的源码分析

闲来无事,决定着手分析RxSwift的源码,以便对它有更好的理解,所针对版本是v3.0。

个人感觉,RxSwift(v3.0)的源码比ReactiveCocoa(v2.5)要更难读懂,决定从小处着手,尝试对下面一段代码进行分析:

/************ 一段简单的RxSwift订阅逻辑代码 ************/
let observable = Observable<Int>.create({ observer in
observer.onNext(0)
observer.onCompleted()
return Disposables.create {
print("subscription disposable")
}
}) // mark 1
let binaryDisposable = observable.subscribe(onNext: {
print($0)
}) // mark 2
binaryDisposable.dispose() // mark 3

mark 1处,创建一个AnonymousObservable对象,传入的closure本文称为「Subscribe Closure」。

mark 2处执行的逻辑其实是ObservableType的扩展方法:

/************ ObservableType+Extensions.swift ************/
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
let observer = AnonymousObserver<E> { e in
switch e {
case .next(let value):
onNext?(value)
case .error(let e):
onError?(e)
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.subscribeSafe(observer), // fake 1
disposable
)
}

在这层逻辑中,根据onNext/onError/onCompleted等closure创建一个observer(AnonymousObserver实例)(下文称该对象为Anonymous Observer),并在fake 1处订阅新创建的observer。最后返回一个disposable,该disposable的类型是BinaryDisposable(本文就叫它Binary Disposable,它包含两个子disposables,为了叙述方便,本文称它们为:

  • Disposer
  • User Disposable
  • User Disposable的dispose逻辑,本质上是用户在执行subscribe(onNext, onError, onCompleted, onDisposed)时传入的onDisposed closure;本文着重关心fake 1返回的Disposer。

Disposer是什么东西呢?这得展开subscribeSafe(_:)方法,该方法定义非常简单:

/************ ObservableType#subscribeSafe ************/
extension ObservableType {
/// All internal subscribe calls go through this method.
func subscribeSafe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
return self.asObservable().subscribe(observer) // book 1
}
}

上文提到,mark 1创建了AnonymousObservable对象,AnonymousObservable的继承树是:

|---AnonymousObservable
|---Producer
|---Observable

简单来说,ObservableType#subscribeSafe(_:)book 1处,本质上是回调Producer#subscribe(_:),该方法有两个分支(if-else),只看其中一个即可:

return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer() // find 1
let sinkAndSubscription = self.run(observer, cancel: disposer) //find 2
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) // find 3
return disposer // find 4
}

代码中,find 1创建一个SinkDisposer对象,并在find 4处返回,由此可以断定,上文提到的Disposer是一个SinkDisposer实例。

分析代码可知,SinkDisposer对象是一个disposable,可以理解为一个容器,它持有两个子disposable:

  • Sink
  • Subscription Disposable

SinkDisposer被disposed时,这两个子disposable也会被disposed。

在RxSwift(v3.0)的设计中,基于Producer创建了各种各样的observer类型,每个observer几乎都对应一个sink,譬如Deferred-DeferredSink

AnonymousObservable而言,订阅其对象时,最终会触发其sink(AnonymousObservableSink)的run方法:

/************ AnonymousObservableSink#run ************/
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}

这里的parent._subscribeHandler其实就是上文提到的Subscribe Closure,给Subscribe Closure传入的参数并非上文创建的Anonymous Observable,而是一个新创建的AnyObserver实例(下文称之为Any Observer),值得一提的是,AnyObserver是一个struct类型。

换句话说,下面代码中的observer其实是一个AnyObserver类型。

上面文字有些凌乱,给张图看看吧!

reference-relationship@2x.png

上图是我阅读代码过程中结合自己的理解画的图。

RxSwift在构建observer、observable、disposable的关系时,实际上创建了很多内部类型,内部类型之间的关系比较凌乱,不太容易理清楚。但实际曝露给用户的只有Any Observer和Binary Disposable,以及observer。

代码中比较难梳理的是Disposer(SinkDisposer)和Sink(AnonymousObservableSink)的关系,尤其比较奇怪它们居然相互引用。难道不怕内存泄漏?然而,实际上是没有问题的,有时间再在此基础上梳理一下内存关系。

Q & A

上面写得有些乱,以Q & A形式重新梳理一下吧。

Q: Event的传播路径是怎样的?

AnonymousObservableAnonymousObservableSink而言,event的传播路径如下:

event-translate-path@2x.png

Any Observer暴露给用户,用户在它的基础上发射event,之后经过Sink转发,最后到达Anonymous Observer,后者封装了用户定义的onNext closure,进而进行处理。

然而,当Sink的disposed状态为true时,event就不传播到Anonymous Observer。据我所知,目前有两种情况会触发Sink的dispose:

  • Any Observer传入的event为error或者completed时
  • Binary Disposable被disposed时,会导致Disposer被disposed,进而触发Sink被disposed

Q: 如何理解User Disposable?

就执行时机来说,User Disposable里封装的onDisposed closure在两种情况下会被回调,但是它只会被调用一次,User Disposable的本质是AnonymousDisposable实例(若onDisposed不为空),它在执行disposed closure后,将其置为nil,以确保下次不会再被执行。

就功能来说,目前还不理解这个有啥用,毕竟ReactiveCocoa(v2.5)中没有这个东西,希望以后能补充吧!

Q: 如何理解Disposer?

基于Producer的observer,其Disposer都是一样的,都是SinkDisposer类型。

从代码上看,它主要用来管理Subscription Disposable和Sink,确保在合适的时机对它们进行dispose。

Q: 如何理解Sink?

对Sink的理解还不透彻,感觉在讨论多个级联operation时才能比较清晰地看出它的意义。

Q: Subscribe Closure里的observer一定都是AnyObserver类型吗?

不晓得,以后补充吧!

本文参考