ReactiveCocoa 学习 - 5

详细研究一下RAC中的冷热信号

2016-06-12 | 阅读

本篇文章主要是在学习 美团的技术分享文章,里面几张图也是直接从这里拿过来的.

冷热信号详解

然后我们再来讨论这个RAC中一个重点问题.也就是冷热信号.

冷热信号的起源来自于RX的Hot ObservableCold Observable,两者的区别是:

  • 热信号是主动的,即使你没有订阅事件,它仍然会时刻推送。
  • 热信号可以有多个订阅者,是一对多,信号可以与订阅者共享信息

产生热信号的原因,有些信号我们不想再订阅时就执行一次,而是全局共享一个信号.如网络请求的信号,我们并不希望在每次对网络请求结果信号进行订阅时,就执行一次新的网络请求.所以我们要将信号转换为热信号,以只执行一次网络访问,而结果可以供多次订阅共享.

热信号都属于一个类RACSubject,这个类在RAC中表示一个可变的信号.我们写一段代码来演示一下其效果:

RACSubject *subject = [RACSubject subject];
RACSubject *replaySubject = [RACReplaySubject subject];

[[RACScheduler mainThreadScheduler] afterDelay:0.1 schedule:^{
    // Subscriber 1
    [subject subscribeNext:^(id x) {
        NSLog(@"Subscriber 1 get a next value: %@ from subject", x);
    }];
    [replaySubject subscribeNext:^(id x) {
        NSLog(@"Subscriber 1 get a next value: %@ from replay subject", x);
    }];
    
    // Subscriber 2
    [subject subscribeNext:^(id x) {
        NSLog(@"Subscriber 2 get a next value: %@ from subject", x);
    }];
    [replaySubject subscribeNext:^(id x) {
        NSLog(@"Subscriber 2 get a next value: %@ from replay subject", x);
    }];
}];

[[RACScheduler mainThreadScheduler] afterDelay:1 schedule:^{
    [subject sendNext:@"send package 1"];
    [replaySubject sendNext:@"send package 1"];
}];

[[RACScheduler mainThreadScheduler] afterDelay:1.1 schedule:^{
    // Subscriber 3
    [subject subscribeNext:^(id x) {
        NSLog(@"Subscriber 3 get a next value: %@ from subject", x);
    }];
    [replaySubject subscribeNext:^(id x) {
        NSLog(@"Subscriber 3 get a next value: %@ from replay subject", x);
    }];
    
    // Subscriber 4
    [subject subscribeNext:^(id x) {
        NSLog(@"Subscriber 4 get a next value: %@ from subject", x);
    }];
    [replaySubject subscribeNext:^(id x) {
        NSLog(@"Subscriber 4 get a next value: %@ from replay subject", x);
    }];
}];

[[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
    [subject sendNext:@"send package 2"];
    [replaySubject sendNext:@"send package 2"];
}];

输出的结果是 :

2016-06-30 23:34:25.722 TestPods[17259:2788197] Subscriber 1 get a next value: send package 1 from subject
2016-06-30 23:34:25.723 TestPods[17259:2788197] Subscriber 2 get a next value: send package 1 from subject
2016-06-30 23:34:25.723 TestPods[17259:2788197] Subscriber 1 get a next value: send package 1 from replay subject
2016-06-30 23:34:25.724 TestPods[17259:2788197] Subscriber 2 get a next value: send package 1 from replay subject
2016-06-30 23:34:25.834 TestPods[17259:2788197] Subscriber 3 get a next value: send package 1 from replay subject
2016-06-30 23:34:25.834 TestPods[17259:2788197] Subscriber 4 get a next value: send package 1 from replay subject
2016-06-30 23:34:26.818 TestPods[17259:2788197] Subscriber 1 get a next value: send package 2 from subject
2016-06-30 23:34:26.818 TestPods[17259:2788197] Subscriber 2 get a next value: send package 2 from subject
2016-06-30 23:34:26.818 TestPods[17259:2788197] Subscriber 3 get a next value: send package 2 from subject
2016-06-30 23:34:26.819 TestPods[17259:2788197] Subscriber 4 get a next value: send package 2 from subject
2016-06-30 23:34:26.819 TestPods[17259:2788197] Subscriber 1 get a next value: send package 2 from replay subject
2016-06-30 23:34:26.819 TestPods[17259:2788197] Subscriber 2 get a next value: send package 2 from replay subject
2016-06-30 23:34:26.820 TestPods[17259:2788197] Subscriber 3 get a next value: send package 2 from replay subject
2016-06-30 23:34:26.820 TestPods[17259:2788197] Subscriber 4 get a next value: send package 2 from replay subject

根据时间线画图如下:

而如果是冷信号的情况的话,就有如下时间线:

可以发现,对于冷信号,类似于重播,每个订阅者都会观察到整个消息的处理过程.而对于subject,类似于直播,多个订阅者接收到一个信号的事件,并且如果信号已经发送过的消息,错过就无法再次接收到.

而还有一个replaySubject对象,将上面代码改写为该对象后,得到的效果图如下:

发现这个信号会保存之前发送过的信号,在新的对象订阅时,将之前的信号发送.

将一个冷信号变成热信号

RACSubject是支持RACSubscriber协议的,热信号的实现就是通过这个RACSubject来订阅一个冷信号,然后其他人在再来订阅这个RACSubject.

观察以下代码:

RACSignal *coldSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
    NSLog(@"Cold signal be subscribed.");
    [[RACScheduler mainThreadScheduler] afterDelay:1.5 schedule:^{
        [subscriber sendNext:@"A"];
    }];
    
    [[RACScheduler mainThreadScheduler] afterDelay:3 schedule:^{
        [subscriber sendNext:@"B"];
    }];
    
    [[RACScheduler mainThreadScheduler] afterDelay:5 schedule:^{
        [subscriber sendCompleted];
    }];
    
    return nil;
}];

RACSubject *subject = [RACSubject subject];
NSLog(@"Subject created.");

[[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
    [coldSignal subscribe:subject];
}];

[subject subscribeNext:^(id x) {
    NSLog(@"Subscriber 1 recieve value:%@.", x);
}];

[[RACScheduler mainThreadScheduler] afterDelay:4 schedule:^{
    [subject subscribeNext:^(id x) {
        NSLog(@"Subscriber 2 recieve value:%@.", x);
    }];
}];

输出结果为 :

2016-07-01 16:32:04.829 TestPods[17618:2842318] Subject created.
2016-07-01 16:32:07.029 TestPods[17618:2842318] Cold signal be subscribed.
2016-07-01 16:32:08.669 TestPods[17618:2842318] Subscriber 1 recieve value:A.
2016-07-01 16:32:10.319 TestPods[17618:2842318] Subscriber 1 recieve value:B.
2016-07-01 16:32:10.319 TestPods[17618:2842318] Subscriber 2 recieve value:B.

得到下图:

这样自行处理热信号,过于简单,有一些问题,如当Subject取消订阅时,不能取消对应的冷信号的订阅.而RAC中有对冷信号转换为热信号的标准接口 :

//创建一个普通的热信号
- (RACMulticastConnection *)publish;
// 创建一个热信号,并将值发送给一个RACSubject对象
- (RACMulticastConnection *)multicast:(RACSubject *)subject;
// 创建重播热信号,并立即订阅,信号使用RACReplaySubject,即会重播已经发送的所有信号
- (RACSignal *)replay;
// 创建一个热信号,并立即订阅,使用RACReplaySubject,但设置capacity为1,即只会重播一次信号
- (RACSignal *)replayLast;
// 创建一个热信号.但不立即订阅,等待其他人订阅这个热信号.
- (RACSignal *)replayLazily;

RAC提供的热信号处理的具体实现

RAC提供以上五个方法中,最重要的就是 - (RACMulticastConnection *)multicast:(RACSubject *)subject,其他都是基于这个实现的.看一下这个函数的实现:

/// implementation RACSignal (Operations)
- (RACMulticastConnection *)multicast:(RACSubject *)subject {
    [subject setNameWithFormat:@"[%@] -multicast: %@", self.name, subject.name];
    RACMulticastConnection *connection = [[RACMulticastConnection alloc] initWithSourceSignal:self subject:subject];
    return connection;
}

/// implementation RACMulticastConnection

- (id)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject {
    NSCParameterAssert(source != nil);
    NSCParameterAssert(subject != nil);

    self = [super init];
    if (self == nil) return nil;

    _sourceSignal = source;
    _serialDisposable = [[RACSerialDisposable alloc] init];
    _signal = subject;

    return self;
}

#pragma mark Connecting

- (RACDisposable *)connect {
    BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &_hasConnected);

    if (shouldConnect) {
        self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];
    }

    return self.serialDisposable;
}

- (RACSignal *)autoconnect {
    __block volatile int32_t subscriberCount = 0;

    return [[RACSignal
        createSignal:^(id<RACSubscriber> subscriber) {
            OSAtomicIncrement32Barrier(&subscriberCount);

            RACDisposable *subscriptionDisposable = [self.signal subscribe:subscriber];
            RACDisposable *connectionDisposable = [self connect];

            return [RACDisposable disposableWithBlock:^{
                [subscriptionDisposable dispose];

                if (OSAtomicDecrement32Barrier(&subscriberCount) == 0) {
                    [connectionDisposable dispose];
                }
            }];
        }]
        setNameWithFormat:@"[%@] -autoconnect", self.signal.name];
}

简单说明一下流程:

  1. multicast:方法将 signalsubject作为参数创建一个RACMulticastConnect的热信号.
  2. RACMulticastConnectioninitWithSourceSignal: subject :初始化时,创建一个RACSerialDisposable对象用于取消订阅.
  3. 在对RACMulticastConnection对象调用connect时,会判断热信号是否已经与原始信号连接在一起了,如果没有的话,则用_signal这个对象订阅sourceSignal.
  4. 而这个_signal是一个RACSubject的对象,所以是一个热信号,会在connect时订阅sourceSignal,然后传递事件.

然后再来看一下 另外4个方法的实现:

/// implementation RACSignal (Operations)
- (RACMulticastConnection *)publish {
    RACSubject *subject = [[RACSubject subject] setNameWithFormat:@"[%@] -publish", self.name];
    RACMulticastConnection *connection = [self multicast:subject];
    return connection;
}

- (RACSignal *)replay {
    RACReplaySubject *subject = [[RACReplaySubject subject] setNameWithFormat:@"[%@] -replay", self.name];

    RACMulticastConnection *connection = [self multicast:subject];
    [connection connect];

    return connection.signal;
}

- (RACSignal *)replayLast {
    RACReplaySubject *subject = [[RACReplaySubject replaySubjectWithCapacity:1] setNameWithFormat:@"[%@] -replayLast", self.name];

    RACMulticastConnection *connection = [self multicast:subject];
    [connection connect];

    return connection.signal;
}

- (RACSignal *)replayLazily {
    RACMulticastConnection *connection = [self multicast:[RACReplaySubject subject]];
    return [[RACSignal
        defer:^{
            [connection connect];
            return connection.signal;
        }]
        setNameWithFormat:@"[%@] -replayLazily", self.name];
}
  1. 对于publish,创建一个普通的RACSubject对象,一个普通的热信号.
  2. 对于replay ,创建一个RACReplySubject对象的热信号,这个热信号会重播之前的历史信号值.
  3. 对于replayLast ,以RACReplySubject对象创建一个热信号,但是设置Capacity为1,也就是只重发最后一次的历史值.
  4. 对于replayLazily ,使用defer命令,只在信号真正被订阅时,才去连接热信号.