四种数据流
- 简单模式
这种模式最为传统,即客户端发起一次请求,服务端响应一个数据 - 服务器数据流
这种模式是客户端发起一次请求,服务端返回一段连续的数据流。典型的例子是客户端向服务端发送一个股票代码,服务端就把该股票的实时数据源源不断的返回给客户端。 - 客户端数据流
与服务端数据流模式相反,这次是客户端源源不断的向服务端发送数据流,而在发送结束后,由服务端返回一个响应。典型的例子是物联网终端向服务器报送数据。 - 双向数据流
顾名思义,这是客户端和服务端都可以向对方发送数据流,这个时候双方的数据可以同时互相发送,也就是可以实现实时交互。典型的例子是聊天机器人。
这里只会讲 grpc 中的 * stream*,srteam 顾名思义 就是 一种 流,可以源源不断的推送 数据,很适合 传输一些大数据,或者 服务端 和 客户端 长时间 数据交互,比如 客户端 可以向 服务端 订阅 一个数据,服务端 就 可以利用 stream ,源源不断地 推送数据。
proto
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| syntax = "proto3"; option go_package = "/.;proto";
service Greeter { rpc GetStream(StreamRequestData) returns (stream StreamResponseData); rpc PutStream(stream StreamRequestData) returns (StreamResponseData); rpc AllStream(stream StreamRequestData) returns (stream StreamResponseData); }
message StreamRequestData { string data = 1; }
message StreamResponseData{ string data = 1; }
|
生成代码
1
| protoc -I . stream.proto --go_out=plugins=grpc:.
|
1
| python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. stream.proto
|
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
| package main
import ( "demo1/grpc_stream/proto" "fmt" "google.golang.org/grpc" "io" "net" "sync" "time" )
const ADDRESS = "localhost:50051"
type Server struct{}
func (s *Server) GetStream(request *proto.StreamRequestData, stream proto.Greeter_GetStreamServer) error { for i := 0; i < 10; i++ { _ = stream.Send(&proto.StreamResponseData{ Data: fmt.Sprintf(request.Data+": %v", time.Now().Unix()), }) time.Sleep(time.Second) } return nil }
func (s *Server) PutStream(stream proto.Greeter_PutStreamServer) error { for { r, err := stream.Recv() if err == io.EOF { fmt.Println("EOF") break } else if err != nil { panic(err) } fmt.Println("客户端流模式:" + r.Data) } return nil }
func (s *Server) AllStream(stream proto.Greeter_AllStreamServer) error { wg := sync.WaitGroup{} wg.Add(2) go func() { for { data, err := stream.Recv() if err == io.EOF { fmt.Println("EOF") break } else if err != nil { panic(err) } fmt.Println("接收到客户端数据:" + data.Data) } wg.Done() }() go func() { for i := 0; i < 10; i++ { err := stream.Send(&proto.StreamResponseData{ Data: fmt.Sprintf("我是服务端%d!", i), }) if err != nil { panic(err) } time.Sleep(time.Second) } wg.Done() }() wg.Wait() return nil }
func main() { listener, err := net.Listen("tcp", ADDRESS) if err != nil { panic("failed Listener:" + err.Error()) } g := grpc.NewServer() proto.RegisterGreeterServer(g, &Server{}) err = g.Serve(listener) if err != nil { panic(err) } }
|
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
| package main
import ( "context" "demo1/grpc_stream/proto" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "io" "sync" "time" )
const ADDRESS = "localhost:50051"
func GetStream(c proto.GreeterClient) { rsp, _ := c.GetStream(context.Background(), &proto.StreamRequestData{Data: "likfees"}) for { r, err := rsp.Recv() if err == io.EOF { fmt.Println("EOF") break } else if err != nil { panic(err) } fmt.Println(r.Data) } }
func PutStream(c proto.GreeterClient) { rsp, _ := c.PutStream(context.Background()) for i := 0; i < 10; i++ { _ = rsp.Send(&proto.StreamRequestData{ Data: fmt.Sprintf("%v", time.Now().Unix()), }) time.Sleep(time.Second) } _ = rsp.CloseSend() }
func AllStream(c proto.GreeterClient) { conn, err := c.AllStream(context.Background()) if err != nil { panic("the connect failed! " + err.Error()) } wg := sync.WaitGroup{} wg.Add(2) go func() { for { r, err := conn.Recv() if err == io.EOF { fmt.Println("EOF") break } else if err != nil { panic(err) } fmt.Println("收到服务器发送数据:" + r.Data) } wg.Done() }() go func() { for i := 0; i < 10; i++ { err = conn.Send(&proto.StreamRequestData{ Data: fmt.Sprintf("我是客户端%v!", i), }) if err != nil { panic(err) } time.Sleep(time.Second) } _ = conn.CloseSend() wg.Done() }() wg.Wait() }
func main() { conn, err := grpc.Dial(ADDRESS, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { panic("the connect failed! " + err.Error()) } defer conn.Close() c := proto.NewGreeterClient(conn) GetStream(c) PutStream(c) AllStream(c) }
|
![image-20220127162356072]()