Grpc Streaming 你造?
一、前言
grpc 是一个由 google 推出的、高性能、开源、通用的 rpc 框架。它是基于 HTTP2 协议标准设计开发,默认采用 Protocol Buffers 数据序列化协议,支持多种开发语言。
一般业务场景下,我们都是使用grpc的simple-rpc模式,也就是每次客户端发起请求,服务端会返回一个响应结果的模式。
image.png
但是grpc除了这种一来一往的请求模式外,还有流式模式,下面我们一一道来。
二 grpc服务端流
服务端流模式是说客户端发起一次请求后,服务端在接受到请求后,可以以流的方式,使用同一连接,不断的向客户端写回响应结果,客户端则可以源源不断的接受到服务端写回的数据。
image.png
下面我们通过简单例子,来说明如何使用,服务端端流。
要实现服务端流,需要把grpc方法定义如下:
message Metric { google.protobuf.Timestamp timestamp = 1; int64 metric = 2; } message Average { double val = 1; } service MetricsService { rpc collectServerStream (Metric) returns (stream Average); }
如上rpc方法的返回值类型前添加stream标识 是服务端流,然后服务端实现代码如下:
public class MetricsServiceImpl extends MetricsServiceGrpc.MetricsServiceImplBase { public StreamObserver<StreamingExample.Average> responseObserverT; /** * 服务端流 * * @param request * @param responseObserver */ @Override public void collectServerStream(com.example.server.streaming.StreamingExample.Metric request, io.grpc.stub.StreamObserver<com.example.server.streaming.StreamingExample.Average> responseObserver) { //保存流式响应对象 this.responseObserverT = responseObserver; }
最后启动服务,并当流式对象不为null时候,写回数据到客户端:
public class MetricsServerServerStream { public static void main(String[] args) throws InterruptedException, IOException { //启动服务 MetricsServiceImpl metricsService = new MetricsServiceImpl(); Server server = ServerBuilder.forPort(8080).addService(metricsService).build(); server.start(); //获取steam响应对象,不断的向客户端写回数据 new Thread(new Runnable() { @Override public void run() { for (; ; ) { if (null != metricsService.responseObserverT) { metricsService.responseObserverT.onNext(StreamingExample.Average.newBuilder() .setVal(new Random(1000).nextDouble()) .build()); System.out.println("send to client"); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); server.awaitTermination(); } }
下面我们看客户端代码,客户端代码如下:
public class MetricsClientServerStream { public static void main(String[] args) throws InterruptedException { //获取客户端桩对象 ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080).usePlaintext().build(); MetricsServiceGrpc.MetricsServiceStub stub = MetricsServiceGrpc.newStub(channel); //发起rpc请求,设置StreamObserver用于监听服务器返回结果 stub.collectServerStream(StreamingExample.Metric.newBuilder().setMetric(1L).build(), new StreamObserver<StreamingExample.Average>() { @Override public void onNext(StreamingExample.Average value) { System.out.println(Thread.currentThread().getName() + "Average: " + value.getVal()); } @Override public void onError(Throwable t) { System.out.println("error:" + t.getLocalizedMessage()); } @Override public void onCompleted() { System.out.println("onCompleted:"); } }); }
如上启动客户端后,可以看到StreamObserver的onNext方法会源源不断的接受到服务端返回的数据。
服务端流使用场景:
- 客户端请求一次,但是需要服务端源源不断的返回大量数据时候,比如大批量数据查询的场景。
- 比如客户端订阅服务端的一个服务数据,服务端发现有新数据时,源源不断的吧数据推送给客户端。
三 grpc客户端流
客户端流模式是说客户端发起请求与服务端建立链接后,可以使用同一连接,不断的向服务端传送数据,等客户端把全部数据都传送完毕后,服务端才返回一个请求结果。
image.png
下面我们通过简单例子,来说明如何使用,客户端流。
要实现服务端流,需要把grpc方法定义如下:
service MetricsService { rpc collectClientStream (stream Metric) returns (Average); }
如上rpc方法的入参类型前添加stream标识 是服务端流,然后服务端实现代码如下:
public class MetricsServiceImpl extends MetricsServiceGrpc.MetricsServiceImplBase { /** * 客户端流 * * @param responseObserver * @return */ @Override public StreamObserver<StreamingExample.Metric> collectClientStream(StreamObserver<StreamingExample.Average> responseObserver) { return new StreamObserver<StreamingExample.Metric>() { private long sum = 0; private long count = 0; @Override public void onNext(StreamingExample.Metric value) { System.out.println("value: " + value); sum += value.getMetric(); count++; } @Override public void onError(Throwable t) { System.out.println("severError:" + t.getLocalizedMessage()); responseObserver.onError(t); } @Override public void onCompleted() { responseObserver.onNext(StreamingExample.Average.newBuilder() .setVal(sum / count) .build()); System.out.println("serverComplete: "); } }; }
如上代码,服务端使用流式对象的onNext方法不断接受客户端发来的数据,然后等客户端发送结束后,使用onCompleted方法,把响应结果写回客户端。
下面我们看客户端代码,客户端代码如下:
public class MetricsClient2 { public static void main(String[] args) throws InterruptedException { //1.创建客户端桩 ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080).usePlaintext().build(); MetricsServiceGrpc.MetricsServiceStub stub = MetricsServiceGrpc.newStub(channel); //2.发起请求,并设置结果回调监听 StreamObserver<StreamingExample.Metric> collect = stub.collectClientStream(new StreamObserver<StreamingExample.Average>() { @Override public void onNext(StreamingExample.Average value) { System.out.println(Thread.currentThread().getName() + "Average: " + value.getVal()); } @Override public void onError(Throwable t) { System.out.println("error:" + t.getLocalizedMessage()); } @Override public void onCompleted() { System.out.println("onCompleted:"); } }); //3.使用同一个链接,不断向服务端传送数据 Stream.of(1L, 2L, 3L, 4L,5L).map(l -> StreamingExample.Metric.newBuilder().setMetric(l).build()) .forEach(metric -> { collect.onNext(metric); System.out.println(metric); }); Thread.sleep(3000); collect.onCompleted(); channel.shutdown().awaitTermination(50, TimeUnit.SECONDS); } }
如上启动客户端后,可以看到代码3会把数据1,2,3,4,5通过同一个链接发送到服务端,然后等服务端结束完毕数据后,会计算接受到的数据的平均值,然后把平均值写回客户端。然后代码2设置的监听器的onNext方法就会被回调,然后打印出服务端返回的平均值3。
客户端流使用场景:
- 比如数据批量计算场景:如果只用simple rpc的话,服务端就要一次性收到大量数据,并且在收到全部数据之后才能对数据进行计算处理。如果用客户端流 rpc的话,服务端可以在收到一些记录之后就开始处理,也更有实时性。
四 grpc双向流
双向流意味着客户端向服务端发起请求后,客户端可以源源不断向服务端写入数据的同时,服务端可以源源不断向客户端写入数据。
image.png
下面我们通过简单例子,来说明如何使用双向流。
要实现双向流,需要把grpc方法定义如下:
service MetricsService { rpc collectTwoWayStream (stream Metric) returns (stream Average); }
如上rpc方法的入参类型前添加stream标识 是客户端流,然后服务端实现代码如下:
public class MetricsServiceImpl extends MetricsServiceGrpc.MetricsServiceImplBase { public StreamObserver<StreamingExample.Average> responseObserverT; /** * 双向流 * * @param responseObserver * @return */ @Override public StreamObserver<StreamingExample.Metric> collectTwoWayStream(StreamObserver<StreamingExample.Average> responseObserver) { this.responseObserverT = responseObserver; return new StreamObserver<StreamingExample.Metric>() { private long sum = 0; private long count = 0; @Override public void onNext(StreamingExample.Metric value) { System.out.println("value: " + value); sum += value.getMetric(); count++; } @Override public void onError(Throwable t) { System.out.println("severError:" + t.getLocalizedMessage()); responseObserver.onError(t); } @Override public void onCompleted() { responseObserver.onNext(StreamingExample.Average.newBuilder() .setVal(sum / count) .build()); System.out.println("serverComplete: "); } }; }
如上代码,服务端使用流式对象的onNext方法不断接受客户端发来的数据,然后等客户端发送结束后,使用onCompleted方法,把响应结果写回客户端。并且服务端保存了流式对象responseObserverT用来不断的写数据到客户端
双向流使用场景:
- 需要双向数据交互的场景,比如聊天机器人,游戏室等。
五 StreamObserver转换为反应式框架流
StreamObserver是grpc自己定义的一个流式接口,其定义如下:
public interface StreamObserver<V> { void onNext(V var1); void onError(Throwable var1); void onCompleted(); }
grpc虽然提供了流式接口,但是其并没有提供便捷的流操作符,而我们知道Reactor或者Rxjava这些反应式编程框架,本身是提供了丰富便捷的流操作符的。所以我们想看看如何把StreamObserver转换为反应式框架流,由于Reactor是spring5自带的,所以我们看看如何把StreamObserver转换为Reactor的Flux流对象。
转换代码如下:
public class StreamObserverPublisher implements Publisher<StreamingExample.Average>, StreamObserver<StreamingExample.Average> { private Subscriber<? super StreamingExample.Average> subscriber; @Override public void onNext(StreamingExample.Average l) { subscriber.onNext(l); } @Override public void onError(Throwable throwable) { subscriber.onError(throwable); } @Override public void onCompleted() { subscriber.onComplete(); } @Override public void subscribe(Subscriber<? super StreamingExample.Average> subscriber) { this.subscriber = subscriber; this.subscriber.onSubscribe(new BaseSubscriber() { }); } } public class MetricsClientTwoWay { public static void main(String[] args) throws InterruptedException { //创建客户端桩 ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080).usePlaintext().build(); MetricsServiceGrpc.MetricsServiceStub stub = MetricsServiceGrpc.newStub(channel); //转换StreamObserver流为Flux流 StreamObserverPublisher streamObserverPublisher = new StreamObserverPublisher(); Flux<StreamingExample.Average> flux = Flux.from(streamObserverPublisher); //订阅流,缓存,并消费 flux.buffer(4).subscribe(o -> System.out.println("ele:" + o.size())); // must be done before executing the gRPC request //发起rpc请求 StreamObserver<StreamingExample.Metric> collect = stub.collectTwoWayStream(streamObserverPublisher); }
六 总结
grpc除了提供了simple-rpc还提供了双向流操作,大家可以结合自己的业务场景,选择性使用。另外为了使用反应式框架丰富的流操作符,我们可以便捷的把StreamObserver流转换为Flux流。
作者:加多,资深Java , 著《Java并发编程之美》 ,《深度剖析Apache Dubbo核心技术内幕》,《Java异步编程实战》,公众号:技术原始积累
原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: Grpc Streaming 你造?
暂无评论