grpc双方向チャネル作成チャットルーム

40009 ワード

grpc双方向チャネルの特性を利用して、簡易なチャットルームを作成します.
サービス側:go
クライアント:goとpython
go言語依存https://blog.csdn.net/sunt2018/article/details/106630815
python grpc入門https://blog.csdn.net/sunt2018/article/details/90176015
grpc protobuf契約プロトコルprotoes/hello.proto
syntax = "proto3";

package protoes;

import "google/protobuf/timestamp.proto";

service OnLineChat {
  rpc SayHi(stream HiRequest) returns (stream HiReply) {};
}

message HiRequest {
  string message = 1;
}

message HiReply {
  string message = 1;
  google.protobuf.Timestamp TS = 2;
  MessageType message_type = 3;

  enum MessageType{
    CONNECT_SUCCESS = 0;
    CONNECT_FAILED = 1;
    NORMAL_MESSAGE = 2;
  }
}

goサービス側
package main

import (
	"github.com/golang/protobuf/ptypes/timestamp"
	"google.golang.org/grpc"
	"google.golang.org/grpc/metadata"
	"google.golang.org/grpc/peer"
	pb "grcp_chat_room/protoes"
	"log"
	"net"
	"os"
	"sync"
	"time"
	"fmt"

)


//      ,    (  ,   ),    grpc stream   { name : stream }
type ConnectPool struct {
	sync.Map
}

//          ,    Get,Add,Del BroadCast(    ,  )
func (p *ConnectPool) Get(name string) pb.OnLineChat_SayHiServer{
	if stream, ok := p.Load(name); ok {
		return stream.(pb.OnLineChat_SayHiServer)
	} else {
		return nil
	}
}

func (p *ConnectPool) Add(name string, stream pb.OnLineChat_SayHiServer) {
	p.Store(name, stream)
}

func (p *ConnectPool) Del(name string) {
	p.Delete(name)
}

//        
func (p *ConnectPool) BroadCast(from, message string) {
	log.Printf("BroadCast from: %s, message: %s
"
, from, message) p.Range(func(username_i, stream_i interface{}) bool { username := username_i.(string) stream := stream_i.(pb.OnLineChat_SayHiServer) if username == from { return true } else { stream.Send(&pb.HiReply{ Message: message, MessageType: pb.HiReply_NORMAL_MESSAGE, // 2. TS: &timestamp.Timestamp{Seconds: time.Now().Unix()}, }) } return true }) } var connect_pool *ConnectPool // type Service struct{} func (s *Service) SayHi(stream pb.OnLineChat_SayHiServer) error { peer, _ := peer.FromContext(stream.Context()) log.Printf("Received new connection. %s", peer.Addr.String()) md, _ := metadata.FromIncomingContext(stream.Context()) username := md["name"][0] // metadata , if connect_pool.Get(username) != nil { stream.Send(&pb.HiReply{ Message: fmt.Sprintf("username %s already exists!", username), MessageType: pb.HiReply_CONNECT_FAILED, // 1. , }) return nil } else { // connect_pool.Add(username, stream) stream.Send(&pb.HiReply{ Message: fmt.Sprintf("Connect success!"), MessageType: pb.HiReply_CONNECT_SUCCESS, // 0 }) } go func() { // , stream.Context().Done() connect_pool.Del(username) connect_pool.BroadCast(username, fmt.Sprintf("%s leval room", username)) }() // ,xxxx connect_pool.BroadCast(username, fmt.Sprintf("Welcome %s!", username)) // for { req, err := stream.Recv() if err != nil { return err } connect_pool.BroadCast(username, fmt.Sprintf("%s: %s", username, req.Message)) } return nil } func GetListen() string { if len(os.Args) < 2 { return ":9999" } return os.Args[1] } func main() { connect_pool = &ConnectPool{} // : address, err := net.Listen("tcp", GetListen()) if err != nil { panic(err) } // grpc ser := grpc.NewServer() pb.RegisterOnLineChatServer(ser, &Service{}) // protoes , // if err := ser.Serve(address); err != nil { panic(err) } }

go client
package main

import (
	"bufio"
	"context"
	"flag"
	"fmt"
	"io"
	"log"
	"os"
	"sync"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/metadata"
	pb "grcp_chat_room/protoes"
)

// client.exe -name xxxx -address xxxxx        
var name *string = flag.String("name", "guess", "what's your name?")
var address *string = flag.String("address", "127.0.0.1:8881", "server address")
var mutex sync.Mutex

//          ,         print  
func ConsoleLog(message string) {
	mutex.Lock()
	defer mutex.Unlock()
	fmt.Printf("
------ %s -----
%s
> "
, time.Now(), message) } // func Input(prompt string) string { fmt.Print(prompt) reader := bufio.NewReader(os.Stdin) line, _, err := reader.ReadLine() if err != nil { if err == io.EOF { return "" } else { panic(err) } } return string(line) } func main() { // flag.Parse() // , conn, err := grpc.Dial("localhost:9999", grpc.WithInsecure()) if err != nil { log.Printf(" : [%v] ", err) return } defer conn.Close() // client := pb.NewOnLineChatClient(conn) // context //ctx := context.Background() ctx, cancel := context.WithCancel(context.Background()) ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("name", *name)) // stream, err := client.SayHi(ctx) if err != nil { log.Printf(" : [%v] ", err) } // connected := make(chan bool) // go func() { var ( reply *pb.HiReply err error ) for { reply, err = stream.Recv() if err != nil{ panic(err) } ConsoleLog(reply.Message) if reply.MessageType == pb.HiReply_CONNECT_FAILED { // code=1 cancel() break } if reply.MessageType == pb.HiReply_CONNECT_SUCCESS { // code=0 connected true } // if , , code=2 } }() go func() { connected var ( line string err error ) for { line = Input("") if line == "exit" { cancel() break } err = stream.Send(&pb.HiRequest{ Message: line, }) fmt.Print("> ") if err != nil{ panic(err) } } }() ctx.Done() fmt.Println("Bye") }

python client
import time
from time import strftime, localtime
import grpc
from protoes import hello_pb2, hello_pb2_grpc



def gen():
    while 1:
        i = str(input(">"))
        if i == "q":
            break
        yield hello_pb2.HiRequest(message=i)
        time.sleep(0.1)

def run():
    channel = grpc.insecure_channel("127.0.0.1:9999")
    stub = hello_pb2_grpc.OnLineChatStub(channel)
	
    #   metadata       python
    metadata=(("name","python"),)

    it = stub.SayHi(gen(),metadata=metadata)
    for r in it:
        ztime = strftime("%Y-%m-%d %H:%M:%S", localtime())
        print(f"
------ {ztime} -----
{r.message}
>"
, end = "") # print(dir(r)) if __name__ == '__main__': run()