RxSwift订阅过程的内存分析

RxSwift订阅过程的源内存分析中,对RxSwift的订阅过程进行了简单的源码分析,本文在此基础上分析内存(对象间的持有关系)。

先来看一段伪代码:

let observable = Observable<Int>.create({ observer in
// subscribe closure
})
let binaryDisposable = observable.subscribe(onNext: {
// onNext closure
})

在上面代码中,可以在subscribe closure中发射event,在onNext closure(onErroronCompleted略过)处接收event。然而,onNext closure中并不总能接收到有效的next event,使用ReactiveCocoa的经验告诉我,有两种情况,onNext closure无法接收到next event:

  • 过早取消订阅,即binaryDisposable过早被disposed
  • subscribe closure中发射了error或者completed,之后的值事件无法传播到onNext closure

本文就基于这两种情况分析各种组件在各个阶段的存在情况和引用情况。

主动取消订阅

在下面一段代码中,observable被订阅后,两秒后才发射next事件和completed事件:

{
let observable = Observable<Int>.create({ observer in
DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 2, execute: {
observer.onNext(0)
observer.onCompleted()
// timing 4
})
return Disposables.create {
print("Subscription Disposable")
}
})
let binaryDisposable = observable.subscribe(onNext: {
print("onNext closure: \($0)")
}, onDisposed: {
print("onDisposed closure")
}
) // timing 1
binaryDisposable.dispose() // timing 2
} // timing 3

上面代码逻辑中,标记了4个时间点。创建一个observable,然后订阅(timing 1),接着取消订阅(timing 2),2秒后(timing 4)发送值事件和completed事件。

现在关心的是4个状态下的各个部件的存在情况。

timing 1

cancel-subscribe-timing-1@2x.png

timing 1处,各种组件的引用情况比较容易理清,参考RxSwift订阅过程的源内存分析,需要说明的是,在本文应用场景中,Some Thread实际上就是Main Thread。

timing 2

cancel-subscribe-timing-2@2x.png

timing 2处,Binary Disposable主动触发dispose,因此它持有的两个子disposables(即Disposer和User Disposable)也会触发dispose;另外,Disposer也管理了两个子disposables:Subscription Disposable,因此连带着把后两者也给dispose了,参考SinkDisposer的实现代码,还可以知道,它在dispose()中,除了dispose掉两个子disposables外,还会释放对它们的强持有。

SinkDisposer#dispose()的逻辑如下:

title="SinkDisposer#dispose"
func dispose() {
let previousState = OSAtomicOr32OrigBarrier(DisposeState.disposed.rawValue, &_state)
if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
return
}
if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
guard let sink = _sink else {
rxFatalError("Sink not set")
}
guard let subscription = _subscription else {
rxFatalError("Subscription not set")
}
sink.dispose()
subscription.dispose()
_sink = nil
_subscription = nil
}
}

timing 3

cancel-subscribe-timing-3@2x.png

timing 3中,Binary Disposable超出作用域,因此被释放,但是User Disposable和Disposer仍然被持有,因此仍然存在于内存中。

timing 4

cancel-subscribe-timing-4@2x.png

2秒过后,subscribe closure中的逻辑全部执行完,Some Thread会释放该closure,Any Observer就不复存在了,它引用或间接引用的其他内部资源也便被释放掉了。

整个世界都清净了。

当发射error或completed事件时

接下来看另外一种情况,当「主动取消订阅」的时间晚于completed事件发射时间时:

{
let observable = Observable<Int>.create({ observer in
observer.onNext(0)
observer.onCompleted() // timing 2
return Disposables.create {
print("Subscription Disposable")
}
})
let binaryDisposable = observable.subscribe(onNext: {
print("onNext closure: \($0)")
}, onDisposed: {
print("onDisposed closure")
}
) // timing 1
DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 2, execute: {
binaryDisposable.dispose() // timing 4
})
} // timing 3

上面逻辑中,observable被订阅后立马发送一个next和completed;在2秒后才取消订阅。

timing 1

error-completed-timing-1@2x.png

timing 1处和上文的一样。

timing 2

error-completed-timing-2@2x.png

timing 2处(准确来说,在timing 2之后的某个点),Sink的on(_)方法在处理error或completed事件时,做两件事情:

  • 将event下发(forward on)到Anoymous Observer,后者除了处理event外,还会将它所引用的User Disposable给dispose掉
    回调自己的dispose()方法
  • Sink#dispose()又会触发Disposer#dispose()…故而上文的timing 2所做的事情在这里又会做一遍。

timing 3

error-completed-timing-3@2x.png

timing 3时,Some Thread不再持有Any Observer,后者持有的和间接持有的资源随后也会被释放。

timing 4

error-completed-timing-4@2x.png

timing 4后,或说2秒后,Some Thread不再持有Binary Disposable,便被释放掉了…

世界又清净了…