阻塞数据流的检测和测试 – RxJava

原文链接 作者:Tomasz Nurkiewicz 译者:simonwang

想象一下,你有一个数据流以不定的频率推送一些event。在某些时候,你期望数据流会以每秒几十个消息的频率进行推送,但事实上几秒时间内一个event都没被推送。如果你的数据流是通过web socket,SSE或者其他网络协议进行传输,那么长时间的数据静默则可以断定为网络问题。这时我们经常会人为地发送events(pings)来确保:

  • clients正常运行
  • 让clients知道我方也正常运行

举个具体的例子,假如我们有一个Flowable流来提供events,如果超过1秒没有接收到event,我们就会发送”PING”字符串作为占位符。当静默的时间过长时,那么”PING”每秒都会出现一次。那么这样的需求我们怎样能利用RxJava来实现呢?最明显的,但不正确的做法是将两个原始的流merge在一起:

Flowable<String> events = //...
Flowable<String> pings = Flowable
            .interval(1, SECONDS)
            .map(x -> "PING");
 
Flowable<String> eventsWithPings = events.mergeWith(pings);

mergeWith()操作很关键:它将真正的events和常量pings流联合起来,当没有events出现时,”PING”会出现。不幸的是,这种做法将会与原始流完全脱节。因为即使在events正常出现时,pings的发送并不会停止。而且当静默开始时,我们并不能精确地在每秒结束时发送”PING”。当然,如果你对这种机制ok的话,那就不用往下看了。

debounce()操作符

另一个更复杂的方法需要能够能识别出持续至少1秒的静默,可以使用timeout()操作符做到这点。不幸的是,它能抛出TimeoutException并且从上层数据流那里unsubscribe-这种方法太不稳定。我们仅想要得到某种notification,事实证明debounce()可以做到这点。正常情况下,这个操作符为了防止在极短时间段内有新的events到来,它会推迟上一个event的推送(debounce()的详细作用见ReactiveX官网定义)。我们重写这个方法:

Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);

这就意味着,如果在1秒内某个event没有后续的其他event跟随,那么delayed流就会将这个event发送出去。严格说来,如果events流生产event的速度足够快的话,delayed流将不会发送任何东西,我们将会利用delayed流获取到静默的产生:

Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);
Flowable<String> pings = delayed.map(ev -> "PING");
Flowable<String> eventsWithPings = Flowable.merge(events, pings);

mergeWith()与静态方法merge()的使用就结果而言并没有差异。如果events流中event的接收与发送频繁,那么delayed流不会接收到任何的event,”PING”信息也就不会被发送。然而,如果events流超过1秒没有发送任何event的话,delayed会将events流中最近接收到的一个event映射为”PING”。这种方法很聪明,但有个缺陷。在发生阻塞后,delayed流只会发送一次”PING”,并不会周期性的每1秒钟发送一次。然而,解决这点很简单!与其将最近接收到的event转化为单个”PING”,我们可以将它转化为周期性的pings序列:

Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);
Flowable<String> pings = delayed
        .flatMap(x -> Flowable
                .interval(0, 1, SECONDS)
                .map(e -> "PING")
        );
Flowable<String> eventsWithPings = Flowable.merge(events, pings);

上面的代码仍然有缺陷,因为每次数据流发生一点点阻塞,之后的每秒我们都会发送pings。但我们需要的是在数据流恢复正常的events流动后,pings的推送要马上停止,这一点我们并没有做到。每一次上层流中发生的阻塞,都会产生一个新的无限pings流,使其出现在最终的merge流中。无论如何,在events流恢复正常的event推送之后,我们都应该通知delayed流停止发送pings。而takeUntil()则刚好可以做到这点!

Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);
Flowable<String> pings = delayed
        .flatMap(x -> Flowable
                .interval(0, 1, SECONDS)
                .map(e -> "PING")
                .takeUntil(events)
        );
Flowable<String> eventsWithPings = Flowable.merge(events, pings);

花点时间弄明白上面的代码片段。每当events流超过1秒没有动静的时候,delayed流都会发送一个event。当pings流接收到了从delayed流那里发送过来的event后,就会开始推送一个间隔1秒的”PING”序列。然而在events流重新开始发送event后,pings流则会终止。所有的这些都可以用一个嵌套的语句表示:

Flowable<String> events = //...
Flowable<String> eventsWithPings = events
        .mergeWith(
                events
                        .debounce(1, SECONDS)
                        .flatMap(x1 -> Flowable
                                .interval(0, 1, SECONDS)
                                .map(e -> "PING")
                                .takeUntil(events)
                        ));

Testability

好的,所有这些都已准备完毕,而我们该如何对一个有着三层嵌套的由事件驱动的代码块进行测试呢?我们该如何确定pings能出现在正确的时间,并且当阻塞结束时它能正确终止呢?要怎样模拟这种时变的场景?不用担心,RxJava有很多杀手锏,其中与时序有关的测试手段可能是最厉害的一个。首先,让我们对pinging代码进行一些修改,使之多一点可测试性和通用性:

<T> Flowable<T> withPings(Flowable<T> events, Scheduler clock, T ping) {
    return events
            .mergeWith(
                    events
                            .debounce(1, SECONDS, clock)
                            .flatMap(x1 -> Flowable
                                    .interval(0, 1, SECONDS, clock)
                                    .map(e -> ping)
                                    .takeUntil(events)
                            ));
 
}

这个工具方法能够处理任意的T流,并且如果在一段时间内T流不产出任何event的话,它能将pings合到原始流中区去。在我们的测试中可以这样使用它:

PublishProcessor<String> events = PublishProcessor.create();
TestScheduler clock = new TestScheduler();
Flowable<String> eventsWithPings = withPings(events, clock, "PING");

PublishProcessor是Flowable的子类,所以我们能够用它来充当events流。另一方面,我们可以利用它的onNext()方法有命令地发送events:

events.onNext("A");

如果某人监听到了events流,他会马上接收到”A”event。而clock又有什么用呢?RxJava中的每个与时间有关的单一运算符(如debounce(), interval(), timeout(), window())都有一个可选的Scheduler参数,它的作用是作为外部的时间源。特殊的TestScheduler就是一个人造的时间源,我们可以完全的掌控它。也就是说,如果我们不调用advanceTimeBy(),时间就是静止的:

clock.advanceTimeBy(999, MILLISECONDS);

999毫秒不是个巧合,Pings要精确的出现在1秒后,所以999毫秒之前pings应该不可见。现在是时候展示完整的测试实例了:

@Test
public void shouldAddPings() throws Exception {
    PublishProcessor<String> events = PublishProcessor.create();
    final TestScheduler clock = new TestScheduler();
    final Flowable<String> eventsWithPings = withPings(events, clock, "PING");
 
    final TestSubscriber<String> test = eventsWithPings.test();
    events.onNext("A");
    test.assertValues("A");
 
    clock.advanceTimeBy(999, MILLISECONDS);
    events.onNext("B");
    test.assertValues("A", "B");
    clock.advanceTimeBy(999, MILLISECONDS);
    test.assertValues("A", "B");
 
    clock.advanceTimeBy(1, MILLISECONDS);
    test.assertValues("A", "B", "PING");
    clock.advanceTimeBy(999, MILLISECONDS);
    test.assertValues("A", "B", "PING");
 
    events.onNext("C");
    test.assertValues("A", "B", "PING", "C");
 
    clock.advanceTimeBy(1000, MILLISECONDS);
    test.assertValues("A", "B", "PING", "C", "PING");
    clock.advanceTimeBy(999, MILLISECONDS);
    test.assertValues("A", "B", "PING", "C", "PING");
 
    clock.advanceTimeBy(1, MILLISECONDS);
    test.assertValues("A", "B", "PING", "C", "PING", "PING");
    clock.advanceTimeBy(999, MILLISECONDS);
    test.assertValues("A", "B", "PING", "C", "PING", "PING");
 
    events.onNext("D");
    test.assertValues("A", "B", "PING", "C", "PING", "PING", "D");
 
    clock.advanceTimeBy(999, MILLISECONDS);
    events.onNext("E");
    test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E");
    clock.advanceTimeBy(999, MILLISECONDS);
    test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E");
 
    clock.advanceTimeBy(1, MILLISECONDS);
    test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E", "PING");
 
    clock.advanceTimeBy(3_000, MILLISECONDS);
    test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E", "PING", "PING", "PING", "PING");
}

看起来像是一堵墙,但这确实是我们逻辑的一个完整测试场景。它确保pings精确地出现在了1000毫秒之后,并且周期性地重复出现,在正常的event再次出现后会马上停止出现。最重要的一点是:这个测试是100%可预见的,并且速度极快。没有Awaitility,busy waiting,polling以及间歇性的测试失败和迟缓。人工的clock可以完全对其进行控制,以确保所有的组合流都能按预期的工作。

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 阻塞数据流的检测和测试 – RxJava

FavoriteLoading添加本文到我的收藏
  • Trackback 关闭
  • 评论 (0)
  1. 暂无评论

您必须 登陆 后才能发表评论

return top