diff --git a/go.mod b/go.mod index ea7dcb76a2f91a35efc1135c93642c5ca6a5687c..f783f7548c1b0dbd35da2f88ed3e3de300c3e568 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/mapstructure v1.1.2 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pretty66/websocketproxy v0.0.0-20220507015215-930b3a686308 // indirect github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 // indirect github.com/satori/go.uuid v1.2.0 // indirect github.com/sirupsen/logrus v1.7.0 // indirect diff --git a/go.sum b/go.sum index b45efd769633f189f331c837e1fa4cb2496d54c1..9a561208f6bc5ca13c26ab93a051ec7f48751e80 100644 --- a/go.sum +++ b/go.sum @@ -131,6 +131,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= +github.com/pretty66/websocketproxy v0.0.0-20220507015215-930b3a686308 h1:JfSau4YABtkm5gRtFWuRWHT2Lsw4ZbyB4F/qORwf+BA= +github.com/pretty66/websocketproxy v0.0.0-20220507015215-930b3a686308/go.mod h1:hxhFuMswfNko9fAxYeqBapfUdJHAgDafBs/MzOZh0X8= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 h1:mZHayPoR0lNmnHyvtYjDeq0zlVHn9K/ZXoy17ylucdo= github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5/go.mod h1:GEXHk5HgEKCvEIIrSpFI3ozzG5xOKA2DVlEX/gGnewM= diff --git a/main.go b/main.go index bd1e7b965efadf946e1cd2d7f773fd452d98d028..94e18a4898abbe56e879b2285a9e81367fe448a9 100644 --- a/main.go +++ b/main.go @@ -1,859 +1,26 @@ package main import ( - "context" - "flag" - "fmt" - "github.com/go-redis/redis/v8" - "hilo-userProxy/common" - "hilo-userProxy/common/config" - "net" + "github.com/pretty66/websocketproxy" "net/http" - _ "net/http/pprof" - "runtime/debug" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" - - "hilo-userProxy/common/mylogrus" - "hilo-userProxy/protocol/userCenter" - "hilo-userProxy/protocol/userProxy" - - "github.com/golang/protobuf/proto" - "github.com/gorilla/websocket" - uuid "github.com/satori/go.uuid" - log "github.com/sirupsen/logrus" - "google.golang.org/grpc" - "google.golang.org/grpc/keepalive" - "google.golang.org/grpc/peer" - "google.golang.org/grpc/resolver" -) - -// 序号号,一个进程一个 -var serialNum uint64 = 0 - -var upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: checkOrigin, -} - -type ProtoData struct { - MsgType uint32 - Data []byte -} - -var myAddress = "" -var m sync.RWMutex -var userChan = make(map[uint64]map[string]chan ProtoData) - -/* 其中CheckOringin是一个函数,该函数用于拦截或放行跨域请求。函数返回值为bool类型,即true放行,false拦截。 -如果请求不是跨域请求可以不赋值,我这里是跨域请求并且为了方便直接返回true -*/ -func checkOrigin(r *http.Request) bool { - return true -} - -const ( - notLoginLimit = 10.0 - heartBeatLossLimit = 30.0 -) - -type ConnectionInfo struct { - RemoteAddr string `json:"remote_addr"` - ConnectTime time.Time - HeartbeatTime time.Time - IsLogin bool - Uid uint64 `json:"uid"` - MsgId uint64 `json:"msg_id"` - ExternalId string `json:"external_id"` - Token string `json:"token"` -} - -func setupRoutes() { - http.HandleFunc("/", homePage) - http.HandleFunc("/ws", serverWebsocket) -} - -func getShortToken(token string) string { - s := strings.Split(token, ".") - if len(s) == 0 { - return token - } else { - return s[len(s)-1] - } -} - -func serverWebsocket(w http.ResponseWriter, r *http.Request) { - ws, err := upgrader.Upgrade(w, r, nil) - if err != nil { - mylogrus.MyLog.Errorf("upgrade err:%v", err) - return - } - - now := time.Now() - ci := ConnectionInfo{ - RemoteAddr: ws.RemoteAddr().String(), - ConnectTime: now, - HeartbeatTime: now, - IsLogin: false, - Uid: 0, - MsgId: 0, - ExternalId: "", - Token: "", - } - traceId := uuid.NewV4().String() - logger := mylogrus.MyLog.WithFields(log.Fields{ - "addr": ci.RemoteAddr, - "token": ci.Token, - "traceId": traceId, - }) - logger.Infof("Websocket client successfully Connected at %s", ci.ConnectTime.String()) - - done := make(chan struct{}) - writeChan := make(chan ProtoData, 1000) - - ticker := time.NewTicker(time.Second * 2) - defer ticker.Stop() - - go func() { - defer common.CheckGoPanic() - defer ws.Close() - defer close(done) - //defer close(writeChan) // 读断了就把写也断了 - for { - messageType, message, err := ws.ReadMessage() - if err != nil { - logger.Infof("read error:%v", err) - logger = logger.WithFields(log.Fields{ - "read_err": err.Error(), - }) - break - } - - if messageType != websocket.BinaryMessage { - logger.Errorf("Unexpected messageType %d", messageType) - continue - } - - msgType, msgId, timeStamp, pbData, err := common.DecodeMessage(message) - if err != nil { - logger.Errorf("Decode error %s", err.Error()) - continue - } - - if msgId <= ci.MsgId { - logger.Infof("Discard outdated message msgId = %d <= %d, msgType = %d", msgId, ci.MsgId, msgType) - } else { - ci.MsgId = msgId - - // 统计时延 - now := time.Now() - us := now.UnixNano()/1000 - int64(timeStamp) - if us > 200000 { - //logger.Infof("Message take %d us to come here.", us) - } - - if msgType == common.MsgTypeLogin { - if ci.IsLogin { - logger.Infof("Connection already logged in %+v", ci) - } else { - var rsp *userProxy.LoginRsp - ci.Token, ci.Uid, rsp, err = processLogin(logger, ci.RemoteAddr, pbData) - if err == nil && rsp != nil { - logger = logger.WithFields(log.Fields{ - "addr": ci.RemoteAddr, - "userId": ci.Uid, - "sToken": getShortToken(ci.Token), - }) - - if buffer, err := proto.Marshal(rsp); err == nil { - writeChan <- ProtoData{ - MsgType: common.MsgTypeLoginRsp, - Data: buffer, - } - } - if rsp.Status == common.Login_success { - ci.IsLogin = true - ci.HeartbeatTime = now - - //logger.Infof("Bind to channel %v", writeChan) - setUserChan(ci.Uid, ci.RemoteAddr, writeChan) - } - } - } - } else if msgType == common.MsgTypeHeartBeat { - status, extUid, err := processHeartBeat(pbData) - //logger.Infof("heartbeat") - if err == nil { - ci.HeartbeatTime = time.Now() - if len(ci.ExternalId) == 0 { - //logger.Infof("Received first heartbeat") - - ci.ExternalId = extUid - - logger = logger.WithFields(log.Fields{ - "addr": ci.RemoteAddr, - "userId": ci.Uid, - "sToken": getShortToken(ci.Token), - "externalId": ci.ExternalId, - }) - } - msg := &userProxy.HeartBeatRsp{ - Status: status, - } - if buffer, err := proto.Marshal(msg); err == nil { - writeChan <- ProtoData{ - MsgType: common.MsgTypeHeartBeatRsp, - Data: buffer, - } - } - } - } else if msgType == common.MsgTypeGlobalGiftBannerRsp { - logger = logger.WithFields(log.Fields{ - "addr": ci.RemoteAddr, - "userId": ci.Uid, - "sToken": getShortToken(ci.Token), - "externalId": ci.ExternalId, - }) - rsp := &userProxy.GlobalGiftBannerRsp{} - err := proto.Unmarshal(pbData, rsp) - if err == nil { - //logger.Infof("GlobalGiftBannerRsp, msgType = %d, %v", msgType, rsp) - } else { - logger.Errorf("Unmarshal error") - } - } else if msgType == common.MsgTypeBiz { - logger = logger.WithFields(log.Fields{ - "addr": ci.RemoteAddr, - "userId": ci.Uid, - }) - rsp, err := processBizRequest(logger, ci.Uid, pbData) - if err == nil && rsp != nil { - //logger.Infof("processBizRequest rsp %+v", rsp) - - if buffer, err := proto.Marshal(rsp); err == nil { - writeChan <- ProtoData{ - MsgType: common.MsgTypeBizRsp, - Data: buffer, - } - } - } - } else if msgType == common.MsgTypeEnterRoom { - if err := processEnterRoom(logger, ci.Uid, pbData); err != nil { - logger.Errorf("processEnterRoom fail:%v", err) - } - } else if msgType == common.MsgTypeLeaveRoom { - if err := processLeaveRoom(logger, ci.Uid, pbData); err != nil { - logger.Errorf("processLeaveRoom fail:%v", err) - } - } else if msgType == common.MsgTypeRoomHeartbeat { - if err := processRoomHeartbeat(logger, ci.Uid, pbData); err != nil { - logger.Errorf("processRoomHeartbeat fail:%v", err) - } - } else if msgType == common.MsgTypeGroupMicChangeRsp { - if err := processGroupMicChangeRsp(logger, ci.Uid, pbData); err != nil { - logger.Errorf("processGroupMicChangeRsp fail:%v", err) - } - } else { - logger.Warnf("Unknown message type %d", msgType) - } - - if err != nil { - logger.Infof("process message error %s", err.Error()) - } - } - } - logger.Infof("exiting read loop for token %s, user %d", ci.Token, ci.Uid) - removeUserChan(ci.Uid, ci.RemoteAddr) - - //b, _ := json.Marshal(ci) - //logger.Infof("exiting read loop for %s, size = %d", string(b), sz) - - if ci.IsLogin { - err, _ := doLogout(ci.RemoteAddr, ci.Uid) - if err == nil { - //logger.Printf("logout result %d", status) - } else { - logger.Printf("Logout failed, %s", err.Error()) - } - ci.IsLogin = false - } - }() - -Loop: - for { - select { - case <-done: - break Loop - case <-ticker.C: - timeDiff := time.Since(ci.HeartbeatTime) - if ci.IsLogin { - if timeDiff.Seconds() > heartBeatLossLimit/2 { - logger.Infof("Heartbeat lost for %f seconds", timeDiff.Seconds()) - } - - if timeDiff.Seconds() > heartBeatLossLimit { - logger.Warnln("Heartbeat lost, terminate!") - - err := ws.WriteControl(websocket.CloseMessage, - websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "heartbeat loss"), - time.Now().Add(time.Second)) - if err != nil { - logger.Println("write close:", err) - } - ws.Close() - break Loop - } - } else { - if timeDiff.Seconds() > notLoginLimit { - logger.Infof("Not loggined for %f seconds, kick!", timeDiff.Seconds()) - - err := ws.WriteControl(websocket.CloseMessage, - websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "no login"), - time.Now().Add(time.Second)) - if err != nil { - logger.Println("write close:", err) - } - _ = ws.Close() - break Loop - } - } - case d := <-writeChan: - if d.MsgType == common.MsgTypeKickUser { - // 特殊消息,用于踢走用户 - logger.Infof("Login from another device, kick!") - - // 取消登录态,避免readloop结束时发logout消息,把新记录也清了 - ci.IsLogin = false - - err := ws.WriteControl(websocket.CloseMessage, - websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "double login"), - time.Now().Add(time.Second*3)) - if err != nil { - logger.Println("write close:", err) - break - } - _ = ws.Close() - break Loop - } - - atomic.AddUint64(&serialNum, 1) - //logger.Infof("About to send msgType = %d, size = %d", d.MsgType, len(d.Data)) - data := common.EncodeMessage(d.MsgType, serialNum, d.Data) - err = ws.WriteMessage(websocket.BinaryMessage, data) - if err != nil { - logger.Println("write close:", err) - } - } - } - //b, _ := json.Marshal(ci) - //logger.Infof("exiting write loop:%s", string(b)) -} - -func processLogin(logger *log.Entry, clientAddr string, pbData []byte) (string, uint64, *userProxy.LoginRsp, error) { - rsp := &userProxy.LoginRsp{} - //logger := log.WithFields(log.Fields{ - // "addr": clientAddr, - //}) - - loginMsg := &userProxy.Login{} - var loginUid uint64 = 0 - err := proto.Unmarshal(pbData, loginMsg) - - if err == nil { - logger := logger.WithFields(log.Fields{ - "addr": clientAddr, - "token": loginMsg.Token, - }) - logger.Info(loginMsg.String()) - - err, rsp.Status, loginUid = doLogin(loginMsg.Token, clientAddr) - if err == nil { - logger.Infof("login status = %d, uid = %d,proxyAddr:%s,clientAddr:%s", rsp.Status, loginUid, myAddress, clientAddr) - } else { - logger.Warnf("login RPC failed for %d, %v", rsp.Status, err) - } - return loginMsg.Token, loginUid, rsp, err - } else { - logger.Warn("unmarshal error") - return loginMsg.Token, 0, rsp, err - } -} - -func processHeartBeat(pbData []byte) (uint32, string, error) { - heartBeat := &userProxy.HeartBeat{} - err := proto.Unmarshal(pbData, heartBeat) - if err == nil { - // FIXME: 增加heartbeat过频控制 - return 0, heartBeat.ExternalUid, err - } - return 0, "", err -} - -func processBizRequest(logger *log.Entry, uid uint64, pbData []byte) (*userProxy.BizResponse, error) { - //logger := log.WithFields(log.Fields{ - // "uid": uid, - //}) - - rsp := &userProxy.BizResponse{} - - msg := &userProxy.BizRequest{} - err := proto.Unmarshal(pbData, msg) - - if err == nil { - logger.Infof("doBizRequest msgType = %d, msg: %s", msg.Type, msg.PayLoad) - - rsp.Status, err = doBizRequest(uid, msg.Type, msg.PayLoad) - if err != nil { - logger.Warnf("doBizRequest RPC failed for %d, %v, uid = %d", uid, rsp.Status, err) - } - return rsp, err - } else { - logger.Warn("unmarshal error") - return rsp, err - } -} - -func processEnterRoom(logger *log.Entry, uid uint64, pbData []byte) error { - msg := &userProxy.EnterRoom{} - err := proto.Unmarshal(pbData, msg) - if err == nil { - err, status := doEnterRoom(uid, msg.GroupId) - if err == nil && status == common.Login_success { - //logger.Infof("enter room success,uid:%v,group:%v", uid, msg.GroupId) - } else { - logger.Warnf("login RPC failed for %d, %v", status, err) - } - return err - } else { - return err - } -} - -func processLeaveRoom(logger *log.Entry, uid uint64, pbData []byte) error { - msg := &userProxy.LeaveRoom{} - err := proto.Unmarshal(pbData, msg) - if err == nil { - err, status := doLeaveRoom(uid, msg.GroupId) - if err == nil && status == common.Login_success { - //logger.Infof("leave room success,uid:%v,group:%v", uid, msg.GroupId) - } else { - logger.Warnf("login RPC failed for %d, %v", status, err) - } - return err - } else { - return err - } -} - -func processRoomHeartbeat(logger *log.Entry, uid uint64, pbData []byte) error { - msg := &userProxy.RoomHeartBeat{} - err := proto.Unmarshal(pbData, msg) - if err == nil { - err, status := doRoomHeartbeat(uid, msg.GroupId) - if err == nil && status == common.Login_success { - //logger.Infof("room heartbeat success,uid:%v,group:%v", uid, msg.GroupId) - } else { - logger.Warnf("login RPC failed for %d, %v", status, err) - } - return err - } else { - return err - } -} - -func processGroupMicChangeRsp(logger *log.Entry, uid uint64, pbData []byte) error { - msg := &userProxy.GroupMicChangeRsp{} - err := proto.Unmarshal(pbData, msg) - if err == nil { - //logger.Infof("groupMicChangeRsp,uid:%v,seqId:%v", uid, msg.SeqId) - } else { - return err - } - return err -} - -// 直接发匹配成功消息,调试用 -func homePage(w http.ResponseWriter, r *http.Request) { - rsp := "Home Page" - var targetUserId uint64 = 0 - fromUserId := "" - toUserId := "" - if r.ParseForm() == nil { - targetUserId, _ = strconv.ParseUint(r.FormValue("target"), 10, 64) - fromUserId = r.FormValue("from") - toUserId = r.FormValue("to") - } - - if targetUserId == 0 || len(fromUserId) == 0 || len(toUserId) == 0 { - rsp = "targetUserId or fromUserId or toUserId is missing." - } else { - m := getChans(targetUserId) - if m == nil { - rsp = "User not found" - } else { - for i, c := range m { - rsp += fmt.Sprintf("address %d bound to channel %v", i, c) - } - } - } - fmt.Fprint(w, rsp) -} - -func setUserChan(uid uint64, addr string, c chan ProtoData) { - m.Lock() - defer m.Unlock() - if userChan[uid] == nil { - userChan[uid] = make(map[string]chan ProtoData) - } - userChan[uid][addr] = c -} - -func getUserChan(uid uint64, addr string) chan ProtoData { - m.RLock() - defer m.RUnlock() - if userChan[uid] == nil { - return nil - } else { - return userChan[uid][addr] - } -} - -func getChans(uid uint64) map[string]chan ProtoData { - m.RLock() - defer m.RUnlock() - return userChan[uid] -} - -func removeUserChan(uid uint64, addr string) int { - m.Lock() - defer m.Unlock() - if chs, ok := userChan[uid]; ok { - delete(chs, addr) - if len(chs) == 0 { - delete(userChan, uid) - } - } - return len(userChan) -} - -const ( - rpcListenPortBase = 50050 -) - -type server struct { - userCenter.UnimplementedRouterServer -} - -func (s *server) Route(ctx context.Context, in *userCenter.RouteMessage) (*userCenter.RouteMessageRsp, error) { - defer func() { - if r := recover(); r != nil { - //打印错误堆栈信息 - mylogrus.MyLog.Errorf("Route SYSTEM ACTION PANIC: %v, stack: %v", r, string(debug.Stack())) - } - }() - //peerInfo, _ := peer.FromContext(ctx) - //mylogrus.MyLog.Infof("Received Route request from %s: msgType = %d, uid = %d, payload size = %d", - // peerInfo.Addr.String(), in.GetMsgType(), in.GetUid(), len(in.GetPayLoad())) - - var status uint32 = common.ROUTE_SUCCESS - m := getChans(in.GetUid()) - if len(m) > 0 { - for _, c := range m { - //mylogrus.MyLog.Infof("Route to user %d, addr %s, channel %v", in.Uid, i, c) - c <- ProtoData{ - MsgType: in.GetMsgType(), - Data: in.GetPayLoad(), - } - } - } else { - status = common.ROUTE_CHANNEL_NOT_FOUND - mylogrus.MyLog.Warnf("No write channel for user %d", in.GetUid()) - } - return &userCenter.RouteMessageRsp{Status: status}, nil -} - -func (s *server) KickUser(ctx context.Context, in *userCenter.KickMessage) (*userCenter.KickMessageRsp, error) { - peerInfo, _ := peer.FromContext(ctx) - logger := mylogrus.MyLog.WithFields(log.Fields{ - "uid": in.Uid, - "addr": in.Addr, - }) - logger.Infof("Received KickUser request from %s", peerInfo.Addr.String()) - - c := getUserChan(in.Uid, in.Addr) - if c == nil { - logger.Infof("No write channel") - } else { - c <- ProtoData{ - MsgType: common.MsgTypeKickUser, - } - } - return &userCenter.KickMessageRsp{Status: 0}, nil -} - -var userClient userCenter.UserClient - -func doLogin(token string, clientAddr string) (error, uint32, uint64) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() - - r, err := userClient.Login(ctx, &userCenter.LoginMessage{ - Token: token, - ProxyAddr: myAddress, - ClientAddr: clientAddr, - }) - if err == nil && r != nil { - return err, r.Status, r.Uid - } else { - return err, 0, 0 - } -} - -func doLogout(addr string, uid uint64) (error, uint32) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() - r, err := userClient.Logout(ctx, &userCenter.LogoutMessage{ - ClientAddr: addr, - Uid: uid, - }) - if err == nil && r != nil { - return err, r.Status - } else { - return err, 0 - } -} - -func doBizRequest(uid uint64, msgType uint32, payLoad string) (uint32, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() - - r, err := userClient.Transmit(ctx, &userCenter.BizMessage{ - Uid: uid, - MsgType: msgType, - PayLoad: payLoad, - }) - if err == nil && r != nil { - return r.Status, err - } else { - return 0, err - } -} - -func doEnterRoom(uid uint64, groupId string) (error, uint32) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() - - r, err := userClient.EnterRoom(ctx, &userCenter.EnterRoomMessage{ - Uid: uid, - GroupId: groupId, - }) - if err == nil && r != nil { - return err, r.Status - } else { - return err, 0 - } -} - -func doLeaveRoom(uid uint64, groupId string) (error, uint32) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() - - r, err := userClient.LeaveRoom(ctx, &userCenter.LeaveRoomMessage{ - Uid: uid, - GroupId: groupId, - }) - if err == nil && r != nil { - return err, r.Status - } else { - return err, 0 - } -} - -func doRoomHeartbeat(uid uint64, groupId string) (error, uint32) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() - - r, err := userClient.RoomHeartbeat(ctx, &userCenter.RoomHeartbeatMessage{ - Uid: uid, - GroupId: groupId, - }) - if err == nil && r != nil { - return err, r.Status - } else { - return err, 0 - } -} - -var kacp = keepalive.ClientParameters{ - Time: 10 * time.Second, // send pings every 10 seconds if there is no activity - Timeout: time.Second, // wait 1 second for ping ack before considering the connection dead - PermitWithoutStream: true, // send pings even without active streams -} - -var ( - defaultUserCenterAddr = "127.0.0.1:50040" // userCenter default addr - userCenterAddr = defaultUserCenterAddr - userCenterConsulName = "userCenter" ) -// grpc服务发现 -type Builder struct { - addrs map[string][]string - - cc resolver.ClientConn -} - -func (b *Builder) Scheme() string { - return "uc" // userCenter -} - -type Resolver struct { -} - -func (r Resolver) ResolveNow(options resolver.ResolveNowOptions) {} - -func (r Resolver) Close() {} - -func (b *Builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { - r := &Resolver{} - paths := b.addrs[target.URL.Path] - addrs := make([]resolver.Address, len(paths)) - for i, s := range paths { - addrs[i] = resolver.Address{Addr: s} - } - cc.UpdateState(resolver.State{Addresses: addrs}) - b.cc = cc - return r, nil -} - -func (b *Builder) UpdateState(addrs []string) { - as := make([]resolver.Address, len(addrs)) - for i, s := range addrs { - as[i] = resolver.Address{Addr: s} - } - b.cc.UpdateState(resolver.State{Addresses: as}) -} - -var certFile = flag.String("certFile", "ws.faceline.live.pem", "cert file") -var keyFile = flag.String("keyFile", "ws.faceline.live.key", "key file") -var listenPort = flag.String("port", "8081", "listen port") - func main() { - flag.Parse() - // init redis cluster - rdbCluster := redis.NewClient(&redis.Options{ - Addr: config.GetConfigRedis().REDIS_CLUSTER_HOST, - Password: config.GetConfigRedis().REDIS_CLUSTER_PASSWORD, + // 代理目标地址 + // 可设置握手前回调函数,修改request信息,增减头部,权限验证等等 + wp, err := websocketproxy.NewProxy("wss://162.62.97.146:8082/ws", func(r *http.Request) error { + // 握手时设置cookie, 权限验证 + r.Header.Set("Cookie", "----") + // 伪装来源 + //r.Header.Set("Origin", "http://82.157.123.54:9010") + return nil }) - - myLocalIp, err := common.GetClientIpV2() if err != nil { - mylogrus.MyLog.Fatal(err) - } - - bd := &Builder{addrs: map[string][]string{"/api": {userCenterAddr}}} - if rdbCluster != nil { - redisKey := fmt.Sprintf("service:userCenter") - ipPorts, err := rdbCluster.ZRangeByScore(context.Background(), redisKey, &redis.ZRangeBy{ - Min: fmt.Sprintf("%d", time.Now().Add(-time.Second*15).Unix()), // 3倍心跳 - Max: "+inf", - }).Result() - if err != nil { - failMsg := fmt.Sprintf("get service fail,svc:%v,err:%v", "userCenter", err) - mylogrus.MyLog.Errorf(failMsg) - } else if len(ipPorts) > 0 { - bd = &Builder{addrs: map[string][]string{"/api": ipPorts}} - userCenterAddr = "uc:///api" - mylogrus.MyLog.Infof("userCenterAddr:%v,addr:%v", userCenterAddr, ipPorts) - } + panic(err) + //t.Fatal() } - // 服务发现 - resolver.Register(bd) - go func() { - //consul.RegisterWatcher(userCenterConsulName, func(addr []string) { - // if len(addr) > 0 { - // bd.UpdateState(addr) // 更新新的注册名 - // } - //}) - ticker := time.NewTicker(time.Minute) - defer ticker.Stop() - for { - select { - case <-ticker.C: - redisKey := fmt.Sprintf("service:userCenter") - ipPorts, err := rdbCluster.ZRangeByScore(context.Background(), redisKey, &redis.ZRangeBy{ - Min: fmt.Sprintf("%d", time.Now().Add(-time.Second*15).Unix()), // 3倍心跳 - Max: "+inf", - }).Result() - if err != nil { - failMsg := fmt.Sprintf("get service fail,svc:%v,err:%v", "userCenter", err) - mylogrus.MyLog.Errorf(failMsg) - break - } - if len(ipPorts) <= 0 { - failMsg := fmt.Sprintf("get service empty,svc:%v,err:%v", "userCenter", err) - mylogrus.MyLog.Errorf(failMsg) - break - } else { - bd.UpdateState(ipPorts) // 更新新的注册名 - } - } - } - }() - - // Set up a connection to the userCenter. - conn, err := grpc.Dial(userCenterAddr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithKeepaliveParams(kacp), - grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy":"%s"}`, "round_robin"))) - if err != nil { - mylogrus.MyLog.Fatalf("did not connect: %v", err) - } - //defer conn.Close() - userClient = userCenter.NewUserClient(conn) - if userClient == nil { - mylogrus.MyLog.Fatalln("userClient null") - } - - go func() { - var i int - var lis net.Listener - var err error - for i = 0; i < 10; i++ { - addr := ":" + strconv.Itoa(rpcListenPortBase+i) - lis, err = net.Listen("tcp", addr) - if err == nil { - mylogrus.MyLog.Infof("Go RPC listening on %s", lis.Addr().String()) - break - } - } - if i >= 10 { - mylogrus.MyLog.Fatalln("No RPC Listen port available.") - } - myAddress = myLocalIp + ":" + strconv.Itoa(rpcListenPortBase+i) - - mylogrus.MyLog.Infof("My address is %s", myAddress) - - s := grpc.NewServer() - userCenter.RegisterRouterServer(s, &server{}) - if err := s.Serve(lis); err != nil { - mylogrus.MyLog.Fatalf("failed to serve: %v", err) - } - }() - - setupRoutes() - - addr := ":" + *listenPort - mylogrus.MyLog.Infof("Go Websocket listening on %s", addr) - // http ws - mux := http.NewServeMux() - mux.HandleFunc("/ws", serverWebsocket) - go http.ListenAndServe(":8082", mux) - - // https wss - mylogrus.MyLog.Infof("certFile = %s, keyFile = %s", *certFile, *keyFile) - mylogrus.MyLog.Printf("%s", http.ListenAndServeTLS(addr, *certFile, *keyFile, nil).Error()) + // 设置代理路径 + http.HandleFunc("/ws", wp.Proxy) + http.ListenAndServe(":8082", nil) }