grpc双方向チャネル作成チャットルーム
40009 ワード
grpc
双方向チャネルの特性を利用して、簡易なチャットルームを作成します.サービス側:go
クライアント:goとpython
go言語依存https://blog.csdn.net/sunt2018/article/details/106630815
python grpc入門https://blog.csdn.net/sunt2018/article/details/90176015
grpc protobuf契約プロトコル
protoes/hello.proto
syntax = "proto3";
package protoes;
import "google/protobuf/timestamp.proto";
service OnLineChat {
rpc SayHi(stream HiRequest) returns (stream HiReply) {};
}
message HiRequest {
string message = 1;
}
message HiReply {
string message = 1;
google.protobuf.Timestamp TS = 2;
MessageType message_type = 3;
enum MessageType{
CONNECT_SUCCESS = 0;
CONNECT_FAILED = 1;
NORMAL_MESSAGE = 2;
}
}
goサービス側
package main
import (
"github.com/golang/protobuf/ptypes/timestamp"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
pb "grcp_chat_room/protoes"
"log"
"net"
"os"
"sync"
"time"
"fmt"
)
// , ( , ), grpc stream { name : stream }
type ConnectPool struct {
sync.Map
}
// , Get,Add,Del BroadCast( , )
func (p *ConnectPool) Get(name string) pb.OnLineChat_SayHiServer{
if stream, ok := p.Load(name); ok {
return stream.(pb.OnLineChat_SayHiServer)
} else {
return nil
}
}
func (p *ConnectPool) Add(name string, stream pb.OnLineChat_SayHiServer) {
p.Store(name, stream)
}
func (p *ConnectPool) Del(name string) {
p.Delete(name)
}
//
func (p *ConnectPool) BroadCast(from, message string) {
log.Printf("BroadCast from: %s, message: %s
", from, message)
p.Range(func(username_i, stream_i interface{}) bool {
username := username_i.(string)
stream := stream_i.(pb.OnLineChat_SayHiServer)
if username == from {
return true
} else {
stream.Send(&pb.HiReply{
Message: message,
MessageType: pb.HiReply_NORMAL_MESSAGE, // 2.
TS: ×tamp.Timestamp{Seconds: time.Now().Unix()},
})
}
return true
})
}
var connect_pool *ConnectPool
//
type Service struct{}
func (s *Service) SayHi(stream pb.OnLineChat_SayHiServer) error {
peer, _ := peer.FromContext(stream.Context())
log.Printf("Received new connection. %s", peer.Addr.String())
md, _ := metadata.FromIncomingContext(stream.Context())
username := md["name"][0] // metadata ,
if connect_pool.Get(username) != nil {
stream.Send(&pb.HiReply{
Message: fmt.Sprintf("username %s already exists!", username),
MessageType: pb.HiReply_CONNECT_FAILED, // 1. ,
})
return nil
} else { //
connect_pool.Add(username, stream)
stream.Send(&pb.HiReply{
Message: fmt.Sprintf("Connect success!"),
MessageType: pb.HiReply_CONNECT_SUCCESS, // 0
})
}
go func() {
// ,
stream.Context().Done()
connect_pool.Del(username)
connect_pool.BroadCast(username, fmt.Sprintf("%s leval room", username))
}()
// ,xxxx
connect_pool.BroadCast(username, fmt.Sprintf("Welcome %s!", username))
//
for {
req, err := stream.Recv()
if err != nil {
return err
}
connect_pool.BroadCast(username, fmt.Sprintf("%s: %s", username, req.Message))
}
return nil
}
func GetListen() string {
if len(os.Args) < 2 {
return ":9999"
}
return os.Args[1]
}
func main() {
connect_pool = &ConnectPool{}
// :
address, err := net.Listen("tcp", GetListen())
if err != nil {
panic(err)
}
// grpc
ser := grpc.NewServer()
pb.RegisterOnLineChatServer(ser, &Service{}) // protoes ,
//
if err := ser.Serve(address); err != nil {
panic(err)
}
}
go client
package main
import (
"bufio"
"context"
"flag"
"fmt"
"io"
"log"
"os"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
pb "grcp_chat_room/protoes"
)
// client.exe -name xxxx -address xxxxx
var name *string = flag.String("name", "guess", "what's your name?")
var address *string = flag.String("address", "127.0.0.1:8881", "server address")
var mutex sync.Mutex
// , print
func ConsoleLog(message string) {
mutex.Lock()
defer mutex.Unlock()
fmt.Printf("
------ %s -----
%s
> ", time.Now(), message)
}
//
func Input(prompt string) string {
fmt.Print(prompt)
reader := bufio.NewReader(os.Stdin)
line, _, err := reader.ReadLine()
if err != nil {
if err == io.EOF {
return ""
} else {
panic(err)
}
}
return string(line)
}
func main() {
//
flag.Parse()
// ,
conn, err := grpc.Dial("localhost:9999", grpc.WithInsecure())
if err != nil {
log.Printf(" : [%v] ", err)
return
}
defer conn.Close()
//
client := pb.NewOnLineChatClient(conn)
// context
//ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("name", *name))
//
stream, err := client.SayHi(ctx)
if err != nil {
log.Printf(" : [%v] ", err)
}
//
connected := make(chan bool)
//
go func() {
var (
reply *pb.HiReply
err error
)
for {
reply, err = stream.Recv()
if err != nil{
panic(err)
}
ConsoleLog(reply.Message)
if reply.MessageType == pb.HiReply_CONNECT_FAILED { // code=1
cancel()
break
}
if reply.MessageType == pb.HiReply_CONNECT_SUCCESS { // code=0
connected true
}
// if , , code=2
}
}()
go func() {
connected
var (
line string
err error
)
for {
line = Input("")
if line == "exit" {
cancel()
break
}
err = stream.Send(&pb.HiRequest{
Message: line,
})
fmt.Print("> ")
if err != nil{
panic(err)
}
}
}()
ctx.Done()
fmt.Println("Bye")
}
python client
import time
from time import strftime, localtime
import grpc
from protoes import hello_pb2, hello_pb2_grpc
def gen():
while 1:
i = str(input(">"))
if i == "q":
break
yield hello_pb2.HiRequest(message=i)
time.sleep(0.1)
def run():
channel = grpc.insecure_channel("127.0.0.1:9999")
stub = hello_pb2_grpc.OnLineChatStub(channel)
# metadata python
metadata=(("name","python"),)
it = stub.SayHi(gen(),metadata=metadata)
for r in it:
ztime = strftime("%Y-%m-%d %H:%M:%S", localtime())
print(f"
------ {ztime} -----
{r.message}
>", end = "")
# print(dir(r))
if __name__ == '__main__':
run()