Grpc Streaming 你造?

一、前言

grpc 是一个由 google 推出的、高性能、开源、通用的 rpc 框架。它是基于 HTTP2 协议标准设计开发,默认采用 Protocol Buffers 数据序列化协议,支持多种开发语言。

一般业务场景下,我们都是使用grpc的simple-rpc模式,也就是每次客户端发起请求,服务端会返回一个响应结果的模式。


image.png

但是grpc除了这种一来一往的请求模式外,还有流式模式,下面我们一一道来。

二 grpc服务端流

服务端流模式是说客户端发起一次请求后,服务端在接受到请求后,可以以流的方式,使用同一连接,不断的向客户端写回响应结果,客户端则可以源源不断的接受到服务端写回的数据。


image.png

下面我们通过简单例子,来说明如何使用,服务端端流。
要实现服务端流,需要把grpc方法定义如下:

message Metric {
    google.protobuf.Timestamp timestamp = 1;
    int64 metric = 2;
}

message Average {
    double val = 1;
}

service MetricsService {
    rpc collectServerStream (Metric) returns (stream Average);
}

如上rpc方法的返回值类型前添加stream标识 是服务端流,然后服务端实现代码如下:

public class MetricsServiceImpl extends MetricsServiceGrpc.MetricsServiceImplBase {

    public StreamObserver<StreamingExample.Average> responseObserverT;

    /**
     * 服务端流
     *
     * @param request
     * @param responseObserver
     */
    @Override
    public void collectServerStream(com.example.server.streaming.StreamingExample.Metric request,
                                    io.grpc.stub.StreamObserver<com.example.server.streaming.StreamingExample.Average> responseObserver) {
        //保存流式响应对象
        this.responseObserverT = responseObserver;
    }

最后启动服务,并当流式对象不为null时候,写回数据到客户端:

public class MetricsServerServerStream {
    public static void main(String[] args) throws InterruptedException, IOException {
        //启动服务
        MetricsServiceImpl metricsService = new MetricsServiceImpl();
        Server server = ServerBuilder.forPort(8080).addService(metricsService).build();
        server.start();

        //获取steam响应对象,不断的向客户端写回数据
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (; ; ) {
                    if (null != metricsService.responseObserverT) {
                        metricsService.responseObserverT.onNext(StreamingExample.Average.newBuilder()
                                .setVal(new Random(1000).nextDouble())
                                .build());
                        System.out.println("send to client");
                    }
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            }
        }).start();
        server.awaitTermination();
    }
}

下面我们看客户端代码,客户端代码如下:

public class MetricsClientServerStream {
    public static void main(String[] args) throws InterruptedException {
        //获取客户端桩对象
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080).usePlaintext().build();
        MetricsServiceGrpc.MetricsServiceStub stub = MetricsServiceGrpc.newStub(channel);

        //发起rpc请求,设置StreamObserver用于监听服务器返回结果
        stub.collectServerStream(StreamingExample.Metric.newBuilder().setMetric(1L).build(), new StreamObserver<StreamingExample.Average>() {
            @Override
            public void onNext(StreamingExample.Average value) {
                System.out.println(Thread.currentThread().getName() + "Average: " + value.getVal());
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("error:" + t.getLocalizedMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("onCompleted:");

            }
        });
}

如上启动客户端后,可以看到StreamObserver的onNext方法会源源不断的接受到服务端返回的数据。

服务端流使用场景:

  • 客户端请求一次,但是需要服务端源源不断的返回大量数据时候,比如大批量数据查询的场景。
  • 比如客户端订阅服务端的一个服务数据,服务端发现有新数据时,源源不断的吧数据推送给客户端。

三 grpc客户端流

客户端流模式是说客户端发起请求与服务端建立链接后,可以使用同一连接,不断的向服务端传送数据,等客户端把全部数据都传送完毕后,服务端才返回一个请求结果。


image.png

下面我们通过简单例子,来说明如何使用,客户端流。
要实现服务端流,需要把grpc方法定义如下:

service MetricsService {
    rpc collectClientStream (stream Metric) returns (Average);
}

如上rpc方法的入参类型前添加stream标识 是服务端流,然后服务端实现代码如下:

public class MetricsServiceImpl extends MetricsServiceGrpc.MetricsServiceImplBase {

    /**
     * 客户端流
     *
     * @param responseObserver
     * @return
     */
    @Override
    public StreamObserver<StreamingExample.Metric> collectClientStream(StreamObserver<StreamingExample.Average> responseObserver) {
        return new StreamObserver<StreamingExample.Metric>() {
            private long sum = 0;
            private long count = 0;

            @Override
            public void onNext(StreamingExample.Metric value) {
                System.out.println("value: " + value);
                sum += value.getMetric();
                count++;
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("severError:" + t.getLocalizedMessage());
                responseObserver.onError(t);
            }

            @Override
            public void onCompleted() {
                responseObserver.onNext(StreamingExample.Average.newBuilder()
                        .setVal(sum / count)
                        .build());
                System.out.println("serverComplete: ");

            }
        };
    }

如上代码,服务端使用流式对象的onNext方法不断接受客户端发来的数据,然后等客户端发送结束后,使用onCompleted方法,把响应结果写回客户端。

下面我们看客户端代码,客户端代码如下:

public class MetricsClient2 {
    public static void main(String[] args) throws InterruptedException {
        //1.创建客户端桩
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080).usePlaintext().build();
        MetricsServiceGrpc.MetricsServiceStub stub = MetricsServiceGrpc.newStub(channel);

        //2.发起请求,并设置结果回调监听
        StreamObserver<StreamingExample.Metric> collect = stub.collectClientStream(new StreamObserver<StreamingExample.Average>() {
            @Override
            public void onNext(StreamingExample.Average value) {
                System.out.println(Thread.currentThread().getName() + "Average: " + value.getVal());
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("error:" + t.getLocalizedMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("onCompleted:");

            }
        });

        //3.使用同一个链接,不断向服务端传送数据
        Stream.of(1L, 2L, 3L, 4L,5L).map(l -> StreamingExample.Metric.newBuilder().setMetric(l).build())
                .forEach(metric -> {
                    collect.onNext(metric);
                    System.out.println(metric);
                });

        Thread.sleep(3000);
        collect.onCompleted();
        channel.shutdown().awaitTermination(50, TimeUnit.SECONDS);
    }
}

如上启动客户端后,可以看到代码3会把数据1,2,3,4,5通过同一个链接发送到服务端,然后等服务端结束完毕数据后,会计算接受到的数据的平均值,然后把平均值写回客户端。然后代码2设置的监听器的onNext方法就会被回调,然后打印出服务端返回的平均值3。

客户端流使用场景:

  • 比如数据批量计算场景:如果只用simple rpc的话,服务端就要一次性收到大量数据,并且在收到全部数据之后才能对数据进行计算处理。如果用客户端流 rpc的话,服务端可以在收到一些记录之后就开始处理,也更有实时性。

四 grpc双向流

双向流意味着客户端向服务端发起请求后,客户端可以源源不断向服务端写入数据的同时,服务端可以源源不断向客户端写入数据。


image.png

下面我们通过简单例子,来说明如何使用双向流。
要实现双向流,需要把grpc方法定义如下:

service MetricsService {
    rpc collectTwoWayStream (stream Metric) returns (stream Average);
}

如上rpc方法的入参类型前添加stream标识 是客户端流,然后服务端实现代码如下:

public class MetricsServiceImpl extends MetricsServiceGrpc.MetricsServiceImplBase {


    public StreamObserver<StreamingExample.Average> responseObserverT;

    /**
     * 双向流
     *
     * @param responseObserver
     * @return
     */
    @Override
    public StreamObserver<StreamingExample.Metric> collectTwoWayStream(StreamObserver<StreamingExample.Average> responseObserver) {
        this.responseObserverT = responseObserver;
        return new StreamObserver<StreamingExample.Metric>() {
            private long sum = 0;
            private long count = 0;

            @Override
            public void onNext(StreamingExample.Metric value) {
                System.out.println("value: " + value);
                sum += value.getMetric();
                count++;
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("severError:" + t.getLocalizedMessage());
                responseObserver.onError(t);
            }

            @Override
            public void onCompleted() {
                responseObserver.onNext(StreamingExample.Average.newBuilder()
                        .setVal(sum / count)
                        .build());
                System.out.println("serverComplete: ");

            }
        };
    }

如上代码,服务端使用流式对象的onNext方法不断接受客户端发来的数据,然后等客户端发送结束后,使用onCompleted方法,把响应结果写回客户端。并且服务端保存了流式对象responseObserverT用来不断的写数据到客户端

双向流使用场景:

  • 需要双向数据交互的场景,比如聊天机器人,游戏室等。

五 StreamObserver转换为反应式框架流

StreamObserver是grpc自己定义的一个流式接口,其定义如下:

public interface StreamObserver<V> {
    void onNext(V var1);

    void onError(Throwable var1);

    void onCompleted();
}

grpc虽然提供了流式接口,但是其并没有提供便捷的流操作符,而我们知道Reactor或者Rxjava这些反应式编程框架,本身是提供了丰富便捷的流操作符的。所以我们想看看如何把StreamObserver转换为反应式框架流,由于Reactor是spring5自带的,所以我们看看如何把StreamObserver转换为Reactor的Flux流对象。

转换代码如下:

public class StreamObserverPublisher implements Publisher<StreamingExample.Average>, StreamObserver<StreamingExample.Average> {

    private Subscriber<? super StreamingExample.Average> subscriber;

    @Override
    public void onNext(StreamingExample.Average l) {
        subscriber.onNext(l);
    }

    @Override
    public void onError(Throwable throwable) {
        subscriber.onError(throwable);
    }

    @Override
    public void onCompleted() {
        subscriber.onComplete();
    }

    @Override
    public void subscribe(Subscriber<? super StreamingExample.Average> subscriber) {
        this.subscriber = subscriber;
        this.subscriber.onSubscribe(new BaseSubscriber() {
        });
    }
}



public class MetricsClientTwoWay {
    public static void main(String[] args) throws InterruptedException {
        //创建客户端桩
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080).usePlaintext().build();
        MetricsServiceGrpc.MetricsServiceStub stub = MetricsServiceGrpc.newStub(channel);

        //转换StreamObserver流为Flux流
        StreamObserverPublisher streamObserverPublisher = new StreamObserverPublisher();
        Flux<StreamingExample.Average> flux = Flux.from(streamObserverPublisher);
        //订阅流,缓存,并消费
        flux.buffer(4).subscribe(o -> System.out.println("ele:" + o.size())); // must be done before executing the gRPC request

        //发起rpc请求
        StreamObserver<StreamingExample.Metric> collect = stub.collectTwoWayStream(streamObserverPublisher);
}

六 总结

grpc除了提供了simple-rpc还提供了双向流操作,大家可以结合自己的业务场景,选择性使用。另外为了使用反应式框架丰富的流操作符,我们可以便捷的把StreamObserver流转换为Flux流。

作者:加多,资深Java , 著《Java并发编程之美》 ,《深度剖析Apache Dubbo核心技术内幕》,《Java异步编程实战》,公众号:技术原始积累

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: Grpc Streaming 你造?

  • Trackback 关闭
  • 评论 (0)
  1. 暂无评论

return top