小规模的流处理框架.Part 2: RxJava 1.x/2.x

原文链接 作者:Tomasz Nurkiewicz 译者:simonwang
part 1: thread pools中,我们设计并实现了一个相对简单的实时处理events的系统。在阅读本文之前你应该确保已经读懂了Part1的那篇文章,这里重新阐述一遍系统的设计要求:

系统能够每秒处理1000个任务,每一个Event至少有2个属性:

  • clientId-我们希望每一秒有多个任务是在同一个客户端下处理的(译者:不同的clientId对应不同的ClientProjection,即对应不同的一系列操作)
  • UUID-全局唯一的

消费一个任务要花费10毫秒,为这样的流设计一个消费者:

  1. 能够实时的处理任务
  2. 和同一个客户端有关的任务应该被有序地处理,例如你不能对拥有同一个clientId的任务序列使用并行处理
  3. 如果10秒内出现了重复的UUID,丢弃它。假设10秒后不会重复

到目前为止我们提出了线程池和共享缓存结合的设计,而在这篇文章中我们会使用RxJava进行实现。开始之前,我从没有提到EventStream是如何实现的,仅仅是给出了API:

[code lang=”java”]
interface EventStream {

void consume(EventConsumer consumer);

}
[/code]

事实上为了能够进行测试,我建立了一个RxJava流,它所有的行为都符合设计要求:

[code lang=”java”]
@Slf4j
class EventStream {

void consume(EventConsumer consumer) {
observe()
.subscribe(
consumer::consume,
e -> log.error("Error emitting event", e)
);
}

Observable<Event> observe() {
return Observable
.interval(1, TimeUnit.MILLISECONDS)
.delay(x -> Observable.timer(RandomUtils.nextInt(0, 1_000), TimeUnit.MICROSECONDS))
.map(x -> new Event(RandomUtils.nextInt(1_000, 1_100), UUID.randomUUID()))
.flatMap(this::occasionallyDuplicate, 100)
.observeOn(Schedulers.io());
}

private Observable<Event> occasionallyDuplicate(Event x) {
final Observable<Event> event = Observable.just(x);
if (Math.random() >= 0.01) {
return event;
}
final Observable<Event> duplicated =
event.delay(RandomUtils.nextInt(10, 5_000), TimeUnit.MILLISECONDS);
return event.concatWith(duplicated);
}

}
[/code]

虽然你们没必要明白这个流模拟器是怎么工作的,但它的工作过程相当有趣。首先我们使用interval()产生一个每毫秒输出一个Long型值(0,1,2)的稳定流(这是因为设计要求系统每秒能处理1000个event)。然后我们使用delay()对每个event进行0到1000微秒内的随机延迟,在这之后events出现的时机就变得不可预测,就更符合真实情况。最终我们使用map()将每个Long型值映射到一个随机的Event上,每个Event都包含一个1000到1100(inclusive-exclusive)之间的clientId。
最后一点就有趣了,我们想模拟随机的重复事件。为了做到这点,我们使用flatMap()将每个event映射到自身(99%情况下)。然而在剩下的1%情况中,我们将event返回两次,第二次出现的时间延迟了10ms到5s。实际应用时,重复的event与第一次出现的event之间会相隔几百个其他的event,这就使得流的行为更加符合真实情况。
有两种方法可以与EventStream进行交互-基于回调的consume()和基于流的observer()。我们可以利用Observable快速地建立处理管道,这种方法的功能和part1中的非常的像但更加简单。

Missing backpressure

首先利用RxJava实现最初的方案非常简短:

[code lang=”java”]
EventStream es = new EventStream();
EventConsumer clientProjection = new ClientProjection(
new ProjectionMetrics(
new MetricRegistry()));

es.observe()
.subscribe(
clientProjection::consume,
e -> log.error("Fatal error", e)
);
[/code]

(ClientProjection,ProjectionMetrics等来自于part1).使用以上的代码几乎会立刻抛出MissingBackpressureException,这也是预料之中的。你们记得吗,我们在part1中最初的方案会运行的越来越慢是因为处理event的潜伏期越来越长。RxJava会尽量避免这种情况,而且也会避免队列溢出。之所以会抛出MissingBackpressureException是因为消费者(ClientProjection)没有能力实时地处理event。这是一个fail-fast机制。聪明的做法就是将处理的过程移到一个独立的线程池,就像之前那样,但这次要使用RxJava来实现:

[code lang=”java”]
EventStream es = new EventStream();
EventConsumer clientProjection = new FailOnConcurrentModification(
new ClientProjection(
new ProjectionMetrics(
new MetricRegistry())));

es.observe()
.flatMap(e -> clientProjection.consume(e, Schedulers.io()))
.window(1, TimeUnit.SECONDS)
.flatMap(Observable::count)
.subscribe(
c -> log.info("Processed {} events/s", c),
e -> log.error("Fatal error", e)
);
[/code]

EventConsumer中添加了一个辅助方法,它能够利用提供的Scheduler异步地处理event:

[code lang=”java”]
@FunctionalInterface
interface EventConsumer {
Event consume(Event event);

default Observable<Event> consume(Event event, Scheduler scheduler) {
return Observable
.fromCallable(() -> this.consume(event))
.subscribeOn(scheduler);
}

}
[/code]

使用flatMap()在一个独立的Scheduler.io()中处理event,这样每一个消费过程都是异步调用的。这次event的处理已经符合实时性的要求了,但还有一个更大的问题。我使用FailOnConcurrentModification对ClientProjection进行包装是有原因的。events的处理都是彼此独立的,所以对于同一个clientId有可能会并发地处理两个event,这样并不好。幸运的是比起使用线程来说,用RxJava解决这个问题要更加简单:

[code lang=”java”]
es.observe()
.groupBy(Event::getClientId)
.flatMap(byClient -> byClient
.observeOn(Schedulers.io())
.map(clientProjection::consume))
.window(1, TimeUnit.SECONDS)
.flatMap(Observable::count)
.subscribe(
c -> log.info("Processed {} events/s", c),
e -> log.error("Fatal error", e)
);
[/code]

上面的代码改动的地方只有一点点。首先我们依据clientId对event进行分组,将单一的Observable流分割成多个流,每个名为byClient的子流都代表着拥有相同clientId的event。现在如果我们对子流进行映射,我们能够确定有相同clientId的event是绝不会并发地被处理的。输出流是惰性的,所以我们必须对流调用subscribe。与其对每一个event单独地调用subscribe,我们选择将每一秒内处理的event收集起来并对其计数。这样一来每秒我们接收到的就是一个Integer类型的event,它代表着每秒内我们处理的event数量。

Impure, non-idiomatic, error-prone, unsafe solution of deduplication using global state

现在我们必须除去重复的UUID,最简单也是最笨的做法就是利用全局状态。我们能够简单地利用filter()在cache中查找重复的event:

[code lang=”java”]
final Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder()
.expireAfterWrite(10, TimeUnit.SECONDS)
.build();

es.observe()
.filter(e -> seenUuids.getIfPresent(e.getUuid()) == null)
.doOnNext(e -> seenUuids.put(e.getUuid(), e.getUuid()))
.subscribe(
clientProjection::consume,
e -> log.error("Fatal error", e)
);
[/code]

如果你想要监控上面代码的效果可以简单的加入一个度量器:

[code lang=”java”]
Meter duplicates = metricRegistry.meter("duplicates");

es.observe()
.filter(e -> {
if (seenUuids.getIfPresent(e.getUuid()) != null) {
duplicates.mark();
return false;
} else {
return true;
}
})
[/code]

在操作符内部访问全局的、尤其是可变的状态时是非常危险的,并且这样会破坏RxJava唯一的目的-简单并发。虽然我们使用的是Guava中线程安全的Cache,但在很多情况下你很容易会忘记这个全局共享的可变状态是可以被多个线程访问的,如果你发现你在操作符链中修改外部的一些变量的话,那就要非常小心了。

Custom distinct() operator in RxJava 1.x

RxJava 1.x有一个distinct()运算函数,它大概可以做如下的工作:

[code lang=”java”]
es.observe()
.distinct(Event::getUuid)
.groupBy(Event::getClientId)
[/code]

不幸的是distinct()会在内部将所有的UUID都存储在一个不断增长的HashSet里面,但我们只关心10s内的重复事件。通过复制粘贴DistinctOperator的实现,我创造了DistinctEvent操作符,它利用了Guava的cache仅仅只存储10s内的UUID。我故意将Event硬编码在这个操作符内而不是将它写成一般性的就是为了让代码更易懂:

[code lang=”java”]
class DistinctEvent implements Observable.Operator<Event, Event> {
private final Duration duration;

DistinctEvent(Duration duration) {
this.duration = duration;
}

@Override
public Subscriber<? super Event> call(Subscriber<? super Event> child) {
return new Subscriber<Event>(child) {
final Map<UUID, Boolean> keyMemory = CacheBuilder.newBuilder()
.expireAfterWrite(duration.toMillis(), TimeUnit.MILLISECONDS)
.<UUID, Boolean>build().asMap();

@Override
public void onNext(Event event) {
if (keyMemory.put(event.getUuid(), true) == null) {
child.onNext(event);
} else {
request(1);
}
}

@Override
public void onError(Throwable e) {
child.onError(e);
}

@Override
public void onCompleted() {
child.onCompleted();
}

};
}
}
[/code]

自定义的操作符使用起来非常简单,实现如下:

[code lang=”java”]
es.observe()
.lift(new DistinctEvent(Duration.ofSeconds(10)))
.groupBy(Event::getClientId)
.flatMap(byClient -> byClient
.observeOn(Schedulers.io())
.map(clientProjection::consume)
)
.window(1, TimeUnit.SECONDS)
.flatMap(Observable::count)
.subscribe(
c -> log.info("Processed {} events/s", c),
e -> log.error("Fatal error", e)
);
[/code]

事实上如果我们跳过每秒的logging实现可以变得更加简单:

[code lang=”java”]
es.observe()
.lift(new DistinctEvent(Duration.ofSeconds(10)))
.groupBy(Event::getClientId)
.flatMap(byClient -> byClient
.observeOn(Schedulers.io())
.map(clientProjection::consume)
)
.subscribe(
e -> {},
e -> log.error("Fatal error", e)
);
[/code]

这个方案比之前的基于线程池和装饰者模式的要更加简短,其中唯一麻烦的部分就是在自定义的操作符中当存储了太多的UUID之后会造成内存泄漏,幸好RxJava 2能解决这个问题。

RxJava 2.x and more powerful built-in distinct()

distinct()允许使用自定义的Collection而不必使用内置的HashSet(感觉2.x中可以使用自定义的数据结构后,1.x中的DistinctEvent就完全没必要了)。不管你是否相信,依赖倒置不仅仅只出现在Spring框架或者Java EE中。当一个库允许你提供它内部数据结构的自定义实现时,这就已经是依赖反转。首先我创造了一个辅助方法,它能够建立Set,Set由Map提供依赖,而Map则由Cache提供依赖。这就像委托一样!

[code lang=”java”]
private Set<UUID> recentUuids() {
return Collections.newSetFromMap(
CacheBuilder.newBuilder()
.expireAfterWrite(10, TimeUnit.SECONDS)
.<UUID, Boolean>build()
.asMap()
);
}
[/code]

有了这个方法之后,我们就能利用以下的代码实现整个任务:

[code lang=”java”]
es.observe()
.distinct(Event::getUuid, this::recentUuids)
.groupBy(Event::getClientId)
.flatMap(byClient -> byClient
.observeOn(Schedulers.io())
.map(clientProjection::consume)
)
.subscribe(
e -> {},
e -> log.error("Fatal error", e)
);
[/code]

这段代码是如此的优雅、简单、清晰!它的大致流程如下:

  • observe一个event流
  • 消除重复的UUID
  • 依据clientId对event分组
  • 对每一个client有序地处理event

希望你能喜欢这些方案,并能将之运用到你的日常生活中去。

See also:

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 小规模的流处理框架.Part 2: RxJava 1.x/2.x

  • Trackback 关闭
  • 评论 (1)
    • v5mark
    • 2016/12/24 10:33上午

    mark

return top