0. 概述

在学习完 GRPC 系列 之后,不知道你对 GRPC 的 Stream 机制的感觉是如何?会不会感觉似乎很难找到用武之地?本文就将通过 GRPC 的 Stream 机制来实现一个发布/订阅模型,带你了解一下 GRPC 的一个实际应用。

本文的所有代码都可以在我的代码仓库中下载:代码链接,并且我已经提供好了 Makefile,你可以简单通过 Make 命令进行尝试。具体的 Make 命令,可以参见最终小结的说明。

1. 本地的发布订阅

在开始 GRPC 的发布订阅之前,我们先来了解一个本地如何实现发布订阅模型,可能你有很多种方法可以实现,但是,我这里演示的是通过 moby(Docker 社区版)的 pubsub 包,然后这里就简单得演示一下:

  1. [root@liqiang.io]# cat local_pubsub.go
  2. import (
  3. "fmt"
  4. "strings"
  5. "time"
  6. "github.com/moby/moby/pkg/pubsub"
  7. )
  8. func main() {
  9. p := pubsub.NewPublisher(100*time.Millisecond, 10)
  10. golang := p.SubscribeTopic(func(v interface{}) bool {
  11. if key, ok := v.(string); ok {
  12. if strings.HasPrefix(key, "golang:") {
  13. return true
  14. }
  15. }
  16. return false
  17. })
  18. go p.Publish("golang: https://golang.org")
  19. time.Sleep(1)
  20. go func() {
  21. fmt.Println("golang topic:", <-golang)
  22. }()
  23. <-make(chan bool)
  24. }

这里就简化处理一下,这里初始化了一个 publisher 对象,然后开启一个 golang 的 Subscribe,然后就是一个发布者(p.Publish)进行发布,然后被 Subscribe 收到,并打印出来,因为这里有一个死锁,所以,最终运行的结果会报错,但是不影响对这个模型的理解:

  1. [root@liqiang.io]# go run local_pubsub.go
  2. golang topic: golang: https://golang.org
  3. ... ...

OK,接下来我就将继续这个本地模型,将他迁移到 GRPC 的发布订阅模型中,但是,要实现 GRPC,先进行 proto 定义。

3. proto 定义

这里我定义一个发布订阅服务的 proto,这个发布订阅服务应该有两个方法,分别是:

  1. [root@liqiang.io]# cat pubsub.proto
  2. syntax = "proto3";
  3. import "google/protobuf/empty.proto";
  4. message Channel {
  5. string name = 2;
  6. }
  7. message SubscribeResult {
  8. string msg = 2;
  9. }
  10. service PubsubService {
  11. rpc Publish (Channel) returns (google.protobuf.Empty);
  12. rpc Subscribe (Channel) returns (stream SubscribeResult);
  13. }

因为发布函数是简单函数,所以不用返回体,只需要知道发布的状态是否成功即可;而订阅因为是需要实时接收消息的,所以响应是一个 stream。

4. 实现发布订阅模型

proto 既然都定义好了,那么下一步就是实现了。这里实现有一些架构上的要素需要说明,这里实现的对象有:

4.1 发布订阅服务器

这里的发布订阅服务器其实就是结合 GRPC 服务器和刚才的本地的发布订阅模型,组合起来的代码是这样的:

  1. [root@liqiang.io]# cat pubsub_server.go
  2. type pubSubServer struct {
  3. pub *local_pubsub.Publisher
  4. }
  5. func NewPubsubService() *pubSubServer {
  6. return &pubSubServer{
  7. pub: local_pubsub.NewPublisher(100*time.Millisecond, 10),
  8. }
  9. }
  10. func (p *pubSubServer) Publish(ctx context.Context, req *pubsub.Channel) (*empty.Empty, error) {
  11. p.pub.Publish(req.Name)
  12. return &empty.Empty{}, nil
  13. }
  14. func (p *pubSubServer) Subscribe(channel *pubsub.Channel, stream pubsub.PubsubService_SubscribeServer) error {
  15. ch := p.pub.SubscribeTopic(func(v interface{}) bool {
  16. if key, ok := v.(string); ok {
  17. if strings.HasPrefix(key, channel.Name) {
  18. return true
  19. }
  20. }
  21. return false
  22. })
  23. for v := range ch {
  24. if err := stream.Send(&pubsub.SubscribeResult{Msg: v.(string)}); err != nil {
  25. return err
  26. }
  27. }
  28. return nil
  29. }

在服务器内部有一个本地的 pubsub 对象,然后如果有订阅者过来,那么就将订阅者注册到这个本地的 pubsub 服务上;一旦有发布者将消息发布过来,那么就可以通过订阅者的 Stream 将消息传过去。

4.2 订阅客户端

  1. [root@liqiang.io]# cat subscribe_client.go
  2. func main() {
  3. conn, err := grpc.Dial("localhost:8123", grpc.WithInsecure())
  4. if err != nil {
  5. log.Fatal(err)
  6. }
  7. defer conn.Close()
  8. client := pubsub.NewPubsubServiceClient(conn)
  9. stream, err := client.Subscribe(
  10. context.Background(), &pubsub.Channel{Name: "golang:"},
  11. )
  12. if err != nil {
  13. log.Fatal(err)
  14. }
  15. for {
  16. reply, err := stream.Recv()
  17. if err != nil {
  18. if err == io.EOF {
  19. break
  20. }
  21. log.Fatal(err)
  22. }
  23. fmt.Println(reply.Msg)
  24. }
  25. }

这里其实就是构建一个 GRPC 客户端,然后创建一个订阅请求,接着就等着发布订阅服务器将发布者发布的消息传回来了。

4.3 发布客户端

  1. [root@liqiang.io]# cat publisher.go
  2. func main() {
  3. conn, err := grpc.Dial("localhost:8123", grpc.WithInsecure())
  4. if err != nil {
  5. log.Fatal(err)
  6. }
  7. defer conn.Close()
  8. client := pubsub.NewPubsubServiceClient(conn)
  9. _, err = client.Publish(
  10. context.Background(), &pubsub.Channel{Name: "golang: hello Go"},
  11. )
  12. if err != nil {
  13. log.Fatal(err)
  14. }
  15. }

发布客户端的逻辑更加得简单,只需要构建一个客户端对象,然后就发布一条消息就好了,都没有服务器的 Stream 信息等,简单快捷。

5. 小结

最终,实现完代码之后,是需要试验一下的,这里除了发布者,其他都是阻塞式的,所以我们需要打开 3 个终端,同时,为了方便使用,我已经将这些代码的运行都添加到 Makefile 里面了,如果你想获取完整代码,可以看开头的概述链接,我已经在代码仓库中放入了所有的代码和运行文件,因此,下面的演示内容我都通过 make 命令来运行:

然后可以在终端 2 上看到发布者发布的信息了:

  1. [root@liqiang.io]# make pubsub_subscriber
  2. go run pubsub/client/golang-subscribe.go
  3. golang: hello Go

这样,就表示我们根据 GRPC 开发的发布订阅模型是工作的了。

6. Ref