当ReactiveCocoa遇上多线程

在使用RAC过程中,无法避免和各种block打交道,譬如RACDynamicSignal被订阅时触发的block(didSubscribe block),处理signal值事件的next block等等。初学者常会纠结这些block的执行时机,本文是我学习过程中对RAC线程问题的梳理。

重温signal的订阅顺序

来看段代码:

NSLog(@"create a signal");
RACSignal *signal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
NSLog(@"didSubscribe block invoked"); // mark 1
[subscriber sendNext:@"value"]; // mark 2
[subscriber sendCompleted];
return nil;
}];
[signal subscribeNext:^(id x) { // mark 3
NSLog(@"capture next event: %@", x); // mark 4
}];
NSLog(@"subscribe finished");

这段代码的执行环境是main thread,执行结果是:

create a signal
didSubscribe block invoked
capture next event: value
subscribe finished

可以看出:

  1. Signal一旦被订阅(如上mark 3),didSubscribe block(如上mark 1)就立马被调用
  2. Signal一旦产生event(如上mark 2),相关的subscription block(如上mark 4)就立马被调用

P.S: 这两句话是下文分析异步订阅异步发送的关键。

异步订阅

何为异步订阅?指的是:signal的订阅逻辑相对于定义signal的上下文而言,不是同步的。

譬如,如下代码在主线程环境下执行:

RACSignal *signal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
NSLog(@"1"); // mark 1
[subscriber sendNext:@0]; // mark 2
[subscriber sendCompleted];
return nil;
}];
[[RACScheduler scheduler] schedule:^{ // mark 3
NSLog(@"2");
[signal subscribeNext:^(id x) {
NSLog(@"3"); // mark 4
}];
}];
NSLog(@"4");

mark 3异步(子线程)执行了一个订阅事件,需要关心的是:mark 2mark 4分别在哪个线程执行,主线程还是子线程?

答案是:mark 2mark 4的调用都在同一个子线程中完成。如下图:

asnyc-subscribe@2x.png

如何解释这一结果呢?先抛结论:signal(RACDynamicSignal实例)一旦被订阅,其didSubscribe block就立马被同步调用。

如下是代码分析:

// signal是RACDynamicSignal类型
[signal subscribeNext:{}] // step 1
->
// RACDynamicSignal#subscribe:
// RACDynamicSignal.m
if (self.didSubscribe != NULL) {
[RACScheduler.subscriptionScheduler schedule:^{ // step 2
self.didSubscribe(subscriber);
}];
}
->
// RACScheduler.subscriptionScheduler, schedule `didSubscribe` block
+ (instancetype)subscriptionScheduler {
static dispatch_once_t onceToken;
static RACScheduler *subscriptionScheduler;
dispatch_once(&onceToken, ^{
subscriptionScheduler = [[RACSubscriptionScheduler alloc] init];
});
return subscriptionScheduler;
}
->
// RACSubscriptionScheduler#schedule:
- (RACDisposable *)schedule:(void (^)(void))block { // step 3
if (RACScheduler.currentScheduler == nil) {
return [self.backgroundScheduler schedule:block];
}
block();
return nil;
}

从如上的调用链可以看出,RACDynamicSignaldidSubscribe block最终在RACSubscriptionScheduler#schedule:中被调用。回到RACSubscriptionScheduler#schedule:

- (RACDisposable *)schedule:(void (^)(void))block {
if (RACScheduler.currentScheduler == nil) {
return [self.backgroundScheduler schedule:block];
}
block();
return nil;
}

RACScheduler.currentScheduler是什么鬼呢?简单来说,若当前上下文被包裹在RACScheduler#schedule:的block中,或者当前上下文处于main thread中,RACScheduler.currentScheduler返回有效值,否则返回nil。而所谓的当前上下文,其实就是[signal subscribeNext:{}]所处的上下文。

换句话说,如果在RACScheduler#schedule中 – 或者在主线程中 – 执行[signal subscribeNext:{}]逻辑,则2~4行的if语句不会被执行,而是直接执行didSubscribe这个block(第5行)。这充分验证了「在订阅线程里执行didSubscribe block」这个说法。

又一个问题来了:如果使用dispatch_async(someQueue, {})代替[RACScheduler scheduler] schedule:{},还有「在订阅线程里执行didSubscribe block」这个保证吗?

我验证过,同样没问题!

异步发送

如下代码在主线程环境下执行:

RACSignal *signal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
NSLog(@"1"); // mark 1
[[RACScheduler scheduler] schedule:^{ // mark 2
NSLog(@"2");
[subscriber sendNext:@0]; // mark 3
[subscriber sendCompleted];
}];
return nil;
}];
[signal subscribeNext:^(id x) {
NSLog(@"3"); // mark 4
}];
NSLog(@"4");

mark 2异步(子线程)产生了一个值事件,这次需要关心的是:mark 1mark 3以及mark 4分别在哪个线程执行,主线程还是子线程?
答案是:mark 1在主线程执行,mark 3mark 4的调用都在同一个子线程中完成。如下图:

asnyc-send@2x.png

Signal一旦产生event,相关的subscription block就立马被调用。这没啥好说的:

- (void)sendNext:(id)value {
@synchronized (self) {
void (^nextBlock)(id) = [self.next copy];
if (nextBlock == nil) return;
nextBlock(value);
}
}

有了对「异步订阅」和「异步发送」的认知,更复杂的情况也比较容易分析了。

同步 + 异步发送

在上面的代码的基础上,做了一点修改,分别以同步和异步的方式产生值事件:

RACSignal *signal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
NSLog(@"1"); // mark 1
[subscriber sendNext:@"5"]; // mark 5
[[RACScheduler scheduler] schedule:^{ // mark 2
NSLog(@"2");
[subscriber sendNext:@"3"]; // mark 3
[subscriber sendCompleted];
}];
return nil;
}];
[signal subscribeNext:^(id x) {
NSLog(@"%@", x); // mark 4
}];
NSLog(@"4");

显然,mark 4被调用两次,需要关心的是:这两次的执行线程是怎么样的?

答案是:5在主线程被打印,3在子线程被打印。如下图:

snyc-send-and-asnyc-send@2x.png

异步订阅+异步发送

继续看代码:

RACSignal *signal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
NSLog(@"1"); // mark 1
[[RACScheduler scheduler] schedule:^{ // mark 2
NSLog(@"2");
[subscriber sendNext:@"3"]; // mark 3
[subscriber sendCompleted];
}];
return nil;
}];
[[RACScheduler scheduler] schedule:^{ // mark 4
NSLog(@"5");
[signal subscribeNext:^(id x) {
NSLog(@"%@", x); // mark 5
}];
}];
NSLog(@"4");

mark 4mark 2分别会创建两个子线程,称为「子线程4」和「子线程2」,则打印结果如下:

asnyc-subscribe-and-asnyc-send@2x.png

subscribeOn:deliverOn:

上面的分析看出,异步订阅和异步发送会带来一些坑,臧成威总结了两点:

  • 订阅时机不确定。即didSubscribe block的执行线程是不确定的,它由订阅的上下文线程决定。
  • 发送时机不确定。subscription block的执行线程也不确定,它有sendNext:/sendError:/sendCompleted的执行线程决定。

P.S: 对于「订阅时机不确定」,更专业的说法是,副作用的执行线程不确定。

subscribeOn:deliverOn:分别可以解决这两个问题。

subscribeOn:

在上文「异步订阅」代码块的基础上添加一句:

RACSignal *signal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
NSLog(@"1"); // mark 1
[subscriber sendNext:@0]; // mark 2
[subscriber sendCompleted];
return nil;
}];
signal = [signal subscribeOn:[RACScheduler mainThreadScheduler]]; // add
[[RACScheduler scheduler] schedule:^{ // mark 3
NSLog(@"2");
[signal subscribeNext:^(id x) {
NSLog(@"3"); // mark 4
}];
}];
NSLog(@"4");

执行结果描述如下:

subscribe-on@2x.png

RACSignal+Operation.h告诫尽可能不要使用subscribeOn:

Creates and returns a signal that executes its side effects and delivers its events on the given scheduler.
Use of this operator should be avoided whenever possible, because the receiver’s side effects may not be safe to run on another thread. If you just want to receive the signal’s events on scheduler, use -deliverOn: instead.

deliverOn:

在上文「异步发送」代码块的基础上添加一句:

RACSignal *signal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
NSLog(@"1"); // mark 1
[[RACScheduler scheduler] schedule:^{ // mark 2
NSLog(@"2");
[subscriber sendNext:@0]; // mark 3
[subscriber sendCompleted];
}];
return nil;
}];
signal = [signal deliverOn:[RACScheduler mainThreadScheduler]]; // add
// 使用`signal = [signal deliverOnMainThread]`更简洁
[signal subscribeNext:^(id x) {
NSLog(@"3"); // mark 4
}];
NSLog(@"4");

执行结果描述如下:

deliver-on@2x.png

RACSignal+Operation.hdeliverOn:的描述如下:

Creates and returns a signal that delivers its events on the given scheduler. Any side effects of the receiver will still be performed on the original thread.
This is ideal when the signal already performs its work on the desired thread, but you want to handle its events elsewhere.
This corresponds to the ObserveOn method in Rx.