0. 概述
在学习完 GRPC 系列 之后,不知道你对 GRPC 的 Stream 机制的感觉是如何?会不会感觉似乎很难找到用武之地?本文就将通过 GRPC 的 Stream 机制来实现一个发布/订阅模型,带你了解一下 GRPC 的一个实际应用。
本文的所有代码都可以在我的代码仓库中下载:代码链接,并且我已经提供好了 Makefile,你可以简单通过 Make 命令进行尝试。具体的 Make 命令,可以参见最终小结的说明。
1. 本地的发布订阅
在开始 GRPC 的发布订阅之前,我们先来了解一个本地如何实现发布订阅模型,可能你有很多种方法可以实现,但是,我这里演示的是通过 moby(Docker 社区版)的 pubsub 包,然后这里就简单得演示一下:
[root@liqiang.io]# cat local_pubsub.go
import (
"fmt"
"strings"
"time"
"github.com/moby/moby/pkg/pubsub"
)
func main() {
p := pubsub.NewPublisher(100*time.Millisecond, 10)
golang := p.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
if strings.HasPrefix(key, "golang:") {
return true
}
}
return false
})
go p.Publish("golang: https://golang.org")
time.Sleep(1)
go func() {
fmt.Println("golang topic:", <-golang)
}()
<-make(chan bool)
}
这里就简化处理一下,这里初始化了一个 publisher 对象,然后开启一个 golang 的 Subscribe,然后就是一个发布者(p.Publish)进行发布,然后被 Subscribe 收到,并打印出来,因为这里有一个死锁,所以,最终运行的结果会报错,但是不影响对这个模型的理解:
[root@liqiang.io]# go run local_pubsub.go
golang topic: golang: https://golang.org
... ...
OK,接下来我就将继续这个本地模型,将他迁移到 GRPC 的发布订阅模型中,但是,要实现 GRPC,先进行 proto 定义。
3. proto 定义
这里我定义一个发布订阅服务的 proto,这个发布订阅服务应该有两个方法,分别是:
- 发布:这是个普通函数,表示向某个频道发布一个消息
- 订阅:这个是个 Stream 方法,表示我要订阅某个频道,如果这个频道有消息,请通知我
[root@liqiang.io]# cat pubsub.proto
syntax = "proto3";
import "google/protobuf/empty.proto";
message Channel {
string name = 2;
}
message SubscribeResult {
string msg = 2;
}
service PubsubService {
rpc Publish (Channel) returns (google.protobuf.Empty);
rpc Subscribe (Channel) returns (stream SubscribeResult);
}
因为发布函数是简单函数,所以不用返回体,只需要知道发布的状态是否成功即可;而订阅因为是需要实时接收消息的,所以响应是一个 stream。
4. 实现发布订阅模型
proto 既然都定义好了,那么下一步就是实现了。这里实现有一些架构上的要素需要说明,这里实现的对象有:
- 发布订阅服务器:用于接收订阅请求,同时也接收发布请求,并且将发布请求转发给订阅者
- 发布客户端:用于向发布订阅服务器发送消息
- 订阅客户端:用于从发布订阅服务器订阅消息,实时处理消息
4.1 发布订阅服务器
这里的发布订阅服务器其实就是结合 GRPC 服务器和刚才的本地的发布订阅模型,组合起来的代码是这样的:
[root@liqiang.io]# cat pubsub_server.go
type pubSubServer struct {
pub *local_pubsub.Publisher
}
func NewPubsubService() *pubSubServer {
return &pubSubServer{
pub: local_pubsub.NewPublisher(100*time.Millisecond, 10),
}
}
func (p *pubSubServer) Publish(ctx context.Context, req *pubsub.Channel) (*empty.Empty, error) {
p.pub.Publish(req.Name)
return &empty.Empty{}, nil
}
func (p *pubSubServer) Subscribe(channel *pubsub.Channel, stream pubsub.PubsubService_SubscribeServer) error {
ch := p.pub.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
if strings.HasPrefix(key, channel.Name) {
return true
}
}
return false
})
for v := range ch {
if err := stream.Send(&pubsub.SubscribeResult{Msg: v.(string)}); err != nil {
return err
}
}
return nil
}
在服务器内部有一个本地的 pubsub 对象,然后如果有订阅者过来,那么就将订阅者注册到这个本地的 pubsub 服务上;一旦有发布者将消息发布过来,那么就可以通过订阅者的 Stream 将消息传过去。
4.2 订阅客户端
[root@liqiang.io]# cat subscribe_client.go
func main() {
conn, err := grpc.Dial("localhost:8123", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := pubsub.NewPubsubServiceClient(conn)
stream, err := client.Subscribe(
context.Background(), &pubsub.Channel{Name: "golang:"},
)
if err != nil {
log.Fatal(err)
}
for {
reply, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}
fmt.Println(reply.Msg)
}
}
这里其实就是构建一个 GRPC 客户端,然后创建一个订阅请求,接着就等着发布订阅服务器将发布者发布的消息传回来了。
4.3 发布客户端
[root@liqiang.io]# cat publisher.go
func main() {
conn, err := grpc.Dial("localhost:8123", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := pubsub.NewPubsubServiceClient(conn)
_, err = client.Publish(
context.Background(), &pubsub.Channel{Name: "golang: hello Go"},
)
if err != nil {
log.Fatal(err)
}
}
发布客户端的逻辑更加得简单,只需要构建一个客户端对象,然后就发布一条消息就好了,都没有服务器的 Stream 信息等,简单快捷。
5. 小结
最终,实现完代码之后,是需要试验一下的,这里除了发布者,其他都是阻塞式的,所以我们需要打开 3 个终端,同时,为了方便使用,我已经将这些代码的运行都添加到 Makefile 里面了,如果你想获取完整代码,可以看开头的概述链接,我已经在代码仓库中放入了所有的代码和运行文件,因此,下面的演示内容我都通过 make 命令来运行:
终端 1
[root@liqiang.io]# make pubsub_server
go run pubsub/server/grpc_pubsub.go
Listen at localhost:8123
终端 2
[root@liqiang.io]# make pubsub_subscriber
go run pubsub/client/golang-subscribe.go
终端 3
[root@liqiang.io]# make pubsub_publisher
go run pubsub/client/publisher.go
然后可以在终端 2 上看到发布者发布的信息了:
[root@liqiang.io]# make pubsub_subscriber
go run pubsub/client/golang-subscribe.go
golang: hello Go
这样,就表示我们根据 GRPC 开发的发布订阅模型是工作的了。