Commit 6a613b4d authored by hujiebin's avatar hujiebin

8082转发proxy

parent e28bf9f8
...@@ -25,6 +25,7 @@ require ( ...@@ -25,6 +25,7 @@ require (
github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.1.2 // indirect github.com/mitchellh/mapstructure v1.1.2 // indirect
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/pretty66/websocketproxy v0.0.0-20220507015215-930b3a686308 // indirect
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 // indirect github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 // indirect
github.com/satori/go.uuid v1.2.0 // indirect github.com/satori/go.uuid v1.2.0 // indirect
github.com/sirupsen/logrus v1.7.0 // indirect github.com/sirupsen/logrus v1.7.0 // indirect
......
...@@ -131,6 +131,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE ...@@ -131,6 +131,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
github.com/pretty66/websocketproxy v0.0.0-20220507015215-930b3a686308 h1:JfSau4YABtkm5gRtFWuRWHT2Lsw4ZbyB4F/qORwf+BA=
github.com/pretty66/websocketproxy v0.0.0-20220507015215-930b3a686308/go.mod h1:hxhFuMswfNko9fAxYeqBapfUdJHAgDafBs/MzOZh0X8=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 h1:mZHayPoR0lNmnHyvtYjDeq0zlVHn9K/ZXoy17ylucdo= github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 h1:mZHayPoR0lNmnHyvtYjDeq0zlVHn9K/ZXoy17ylucdo=
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5/go.mod h1:GEXHk5HgEKCvEIIrSpFI3ozzG5xOKA2DVlEX/gGnewM= github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5/go.mod h1:GEXHk5HgEKCvEIIrSpFI3ozzG5xOKA2DVlEX/gGnewM=
......
package main package main
import ( import (
"context" "github.com/pretty66/websocketproxy"
"flag"
"fmt"
"github.com/go-redis/redis/v8"
"hilo-userProxy/common"
"hilo-userProxy/common/config"
"net"
"net/http" "net/http"
_ "net/http/pprof"
"runtime/debug"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"hilo-userProxy/common/mylogrus"
"hilo-userProxy/protocol/userCenter"
"hilo-userProxy/protocol/userProxy"
"github.com/golang/protobuf/proto"
"github.com/gorilla/websocket"
uuid "github.com/satori/go.uuid"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
)
// 序号号,一个进程一个
var serialNum uint64 = 0
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: checkOrigin,
}
type ProtoData struct {
MsgType uint32
Data []byte
}
var myAddress = ""
var m sync.RWMutex
var userChan = make(map[uint64]map[string]chan ProtoData)
/* 其中CheckOringin是一个函数,该函数用于拦截或放行跨域请求。函数返回值为bool类型,即true放行,false拦截。
如果请求不是跨域请求可以不赋值,我这里是跨域请求并且为了方便直接返回true
*/
func checkOrigin(r *http.Request) bool {
return true
}
const (
notLoginLimit = 10.0
heartBeatLossLimit = 30.0
)
type ConnectionInfo struct {
RemoteAddr string `json:"remote_addr"`
ConnectTime time.Time
HeartbeatTime time.Time
IsLogin bool
Uid uint64 `json:"uid"`
MsgId uint64 `json:"msg_id"`
ExternalId string `json:"external_id"`
Token string `json:"token"`
}
func setupRoutes() {
http.HandleFunc("/", homePage)
http.HandleFunc("/ws", serverWebsocket)
}
func getShortToken(token string) string {
s := strings.Split(token, ".")
if len(s) == 0 {
return token
} else {
return s[len(s)-1]
}
}
func serverWebsocket(w http.ResponseWriter, r *http.Request) {
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
mylogrus.MyLog.Errorf("upgrade err:%v", err)
return
}
now := time.Now()
ci := ConnectionInfo{
RemoteAddr: ws.RemoteAddr().String(),
ConnectTime: now,
HeartbeatTime: now,
IsLogin: false,
Uid: 0,
MsgId: 0,
ExternalId: "",
Token: "",
}
traceId := uuid.NewV4().String()
logger := mylogrus.MyLog.WithFields(log.Fields{
"addr": ci.RemoteAddr,
"token": ci.Token,
"traceId": traceId,
})
logger.Infof("Websocket client successfully Connected at %s", ci.ConnectTime.String())
done := make(chan struct{})
writeChan := make(chan ProtoData, 1000)
ticker := time.NewTicker(time.Second * 2)
defer ticker.Stop()
go func() {
defer common.CheckGoPanic()
defer ws.Close()
defer close(done)
//defer close(writeChan) // 读断了就把写也断了
for {
messageType, message, err := ws.ReadMessage()
if err != nil {
logger.Infof("read error:%v", err)
logger = logger.WithFields(log.Fields{
"read_err": err.Error(),
})
break
}
if messageType != websocket.BinaryMessage {
logger.Errorf("Unexpected messageType %d", messageType)
continue
}
msgType, msgId, timeStamp, pbData, err := common.DecodeMessage(message)
if err != nil {
logger.Errorf("Decode error %s", err.Error())
continue
}
if msgId <= ci.MsgId {
logger.Infof("Discard outdated message msgId = %d <= %d, msgType = %d", msgId, ci.MsgId, msgType)
} else {
ci.MsgId = msgId
// 统计时延
now := time.Now()
us := now.UnixNano()/1000 - int64(timeStamp)
if us > 200000 {
//logger.Infof("Message take %d us to come here.", us)
}
if msgType == common.MsgTypeLogin {
if ci.IsLogin {
logger.Infof("Connection already logged in %+v", ci)
} else {
var rsp *userProxy.LoginRsp
ci.Token, ci.Uid, rsp, err = processLogin(logger, ci.RemoteAddr, pbData)
if err == nil && rsp != nil {
logger = logger.WithFields(log.Fields{
"addr": ci.RemoteAddr,
"userId": ci.Uid,
"sToken": getShortToken(ci.Token),
})
if buffer, err := proto.Marshal(rsp); err == nil {
writeChan <- ProtoData{
MsgType: common.MsgTypeLoginRsp,
Data: buffer,
}
}
if rsp.Status == common.Login_success {
ci.IsLogin = true
ci.HeartbeatTime = now
//logger.Infof("Bind to channel %v", writeChan)
setUserChan(ci.Uid, ci.RemoteAddr, writeChan)
}
}
}
} else if msgType == common.MsgTypeHeartBeat {
status, extUid, err := processHeartBeat(pbData)
//logger.Infof("heartbeat")
if err == nil {
ci.HeartbeatTime = time.Now()
if len(ci.ExternalId) == 0 {
//logger.Infof("Received first heartbeat")
ci.ExternalId = extUid
logger = logger.WithFields(log.Fields{
"addr": ci.RemoteAddr,
"userId": ci.Uid,
"sToken": getShortToken(ci.Token),
"externalId": ci.ExternalId,
})
}
msg := &userProxy.HeartBeatRsp{
Status: status,
}
if buffer, err := proto.Marshal(msg); err == nil {
writeChan <- ProtoData{
MsgType: common.MsgTypeHeartBeatRsp,
Data: buffer,
}
}
}
} else if msgType == common.MsgTypeGlobalGiftBannerRsp {
logger = logger.WithFields(log.Fields{
"addr": ci.RemoteAddr,
"userId": ci.Uid,
"sToken": getShortToken(ci.Token),
"externalId": ci.ExternalId,
})
rsp := &userProxy.GlobalGiftBannerRsp{}
err := proto.Unmarshal(pbData, rsp)
if err == nil {
//logger.Infof("GlobalGiftBannerRsp, msgType = %d, %v", msgType, rsp)
} else {
logger.Errorf("Unmarshal error")
}
} else if msgType == common.MsgTypeBiz {
logger = logger.WithFields(log.Fields{
"addr": ci.RemoteAddr,
"userId": ci.Uid,
})
rsp, err := processBizRequest(logger, ci.Uid, pbData)
if err == nil && rsp != nil {
//logger.Infof("processBizRequest rsp %+v", rsp)
if buffer, err := proto.Marshal(rsp); err == nil {
writeChan <- ProtoData{
MsgType: common.MsgTypeBizRsp,
Data: buffer,
}
}
}
} else if msgType == common.MsgTypeEnterRoom {
if err := processEnterRoom(logger, ci.Uid, pbData); err != nil {
logger.Errorf("processEnterRoom fail:%v", err)
}
} else if msgType == common.MsgTypeLeaveRoom {
if err := processLeaveRoom(logger, ci.Uid, pbData); err != nil {
logger.Errorf("processLeaveRoom fail:%v", err)
}
} else if msgType == common.MsgTypeRoomHeartbeat {
if err := processRoomHeartbeat(logger, ci.Uid, pbData); err != nil {
logger.Errorf("processRoomHeartbeat fail:%v", err)
}
} else if msgType == common.MsgTypeGroupMicChangeRsp {
if err := processGroupMicChangeRsp(logger, ci.Uid, pbData); err != nil {
logger.Errorf("processGroupMicChangeRsp fail:%v", err)
}
} else {
logger.Warnf("Unknown message type %d", msgType)
}
if err != nil {
logger.Infof("process message error %s", err.Error())
}
}
}
logger.Infof("exiting read loop for token %s, user %d", ci.Token, ci.Uid)
removeUserChan(ci.Uid, ci.RemoteAddr)
//b, _ := json.Marshal(ci)
//logger.Infof("exiting read loop for %s, size = %d", string(b), sz)
if ci.IsLogin {
err, _ := doLogout(ci.RemoteAddr, ci.Uid)
if err == nil {
//logger.Printf("logout result %d", status)
} else {
logger.Printf("Logout failed, %s", err.Error())
}
ci.IsLogin = false
}
}()
Loop:
for {
select {
case <-done:
break Loop
case <-ticker.C:
timeDiff := time.Since(ci.HeartbeatTime)
if ci.IsLogin {
if timeDiff.Seconds() > heartBeatLossLimit/2 {
logger.Infof("Heartbeat lost for %f seconds", timeDiff.Seconds())
}
if timeDiff.Seconds() > heartBeatLossLimit {
logger.Warnln("Heartbeat lost, terminate!")
err := ws.WriteControl(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "heartbeat loss"),
time.Now().Add(time.Second))
if err != nil {
logger.Println("write close:", err)
}
ws.Close()
break Loop
}
} else {
if timeDiff.Seconds() > notLoginLimit {
logger.Infof("Not loggined for %f seconds, kick!", timeDiff.Seconds())
err := ws.WriteControl(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "no login"),
time.Now().Add(time.Second))
if err != nil {
logger.Println("write close:", err)
}
_ = ws.Close()
break Loop
}
}
case d := <-writeChan:
if d.MsgType == common.MsgTypeKickUser {
// 特殊消息,用于踢走用户
logger.Infof("Login from another device, kick!")
// 取消登录态,避免readloop结束时发logout消息,把新记录也清了
ci.IsLogin = false
err := ws.WriteControl(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "double login"),
time.Now().Add(time.Second*3))
if err != nil {
logger.Println("write close:", err)
break
}
_ = ws.Close()
break Loop
}
atomic.AddUint64(&serialNum, 1)
//logger.Infof("About to send msgType = %d, size = %d", d.MsgType, len(d.Data))
data := common.EncodeMessage(d.MsgType, serialNum, d.Data)
err = ws.WriteMessage(websocket.BinaryMessage, data)
if err != nil {
logger.Println("write close:", err)
}
}
}
//b, _ := json.Marshal(ci)
//logger.Infof("exiting write loop:%s", string(b))
}
func processLogin(logger *log.Entry, clientAddr string, pbData []byte) (string, uint64, *userProxy.LoginRsp, error) {
rsp := &userProxy.LoginRsp{}
//logger := log.WithFields(log.Fields{
// "addr": clientAddr,
//})
loginMsg := &userProxy.Login{}
var loginUid uint64 = 0
err := proto.Unmarshal(pbData, loginMsg)
if err == nil {
logger := logger.WithFields(log.Fields{
"addr": clientAddr,
"token": loginMsg.Token,
})
logger.Info(loginMsg.String())
err, rsp.Status, loginUid = doLogin(loginMsg.Token, clientAddr)
if err == nil {
logger.Infof("login status = %d, uid = %d,proxyAddr:%s,clientAddr:%s", rsp.Status, loginUid, myAddress, clientAddr)
} else {
logger.Warnf("login RPC failed for %d, %v", rsp.Status, err)
}
return loginMsg.Token, loginUid, rsp, err
} else {
logger.Warn("unmarshal error")
return loginMsg.Token, 0, rsp, err
}
}
func processHeartBeat(pbData []byte) (uint32, string, error) {
heartBeat := &userProxy.HeartBeat{}
err := proto.Unmarshal(pbData, heartBeat)
if err == nil {
// FIXME: 增加heartbeat过频控制
return 0, heartBeat.ExternalUid, err
}
return 0, "", err
}
func processBizRequest(logger *log.Entry, uid uint64, pbData []byte) (*userProxy.BizResponse, error) {
//logger := log.WithFields(log.Fields{
// "uid": uid,
//})
rsp := &userProxy.BizResponse{}
msg := &userProxy.BizRequest{}
err := proto.Unmarshal(pbData, msg)
if err == nil {
logger.Infof("doBizRequest msgType = %d, msg: %s", msg.Type, msg.PayLoad)
rsp.Status, err = doBizRequest(uid, msg.Type, msg.PayLoad)
if err != nil {
logger.Warnf("doBizRequest RPC failed for %d, %v, uid = %d", uid, rsp.Status, err)
}
return rsp, err
} else {
logger.Warn("unmarshal error")
return rsp, err
}
}
func processEnterRoom(logger *log.Entry, uid uint64, pbData []byte) error {
msg := &userProxy.EnterRoom{}
err := proto.Unmarshal(pbData, msg)
if err == nil {
err, status := doEnterRoom(uid, msg.GroupId)
if err == nil && status == common.Login_success {
//logger.Infof("enter room success,uid:%v,group:%v", uid, msg.GroupId)
} else {
logger.Warnf("login RPC failed for %d, %v", status, err)
}
return err
} else {
return err
}
}
func processLeaveRoom(logger *log.Entry, uid uint64, pbData []byte) error {
msg := &userProxy.LeaveRoom{}
err := proto.Unmarshal(pbData, msg)
if err == nil {
err, status := doLeaveRoom(uid, msg.GroupId)
if err == nil && status == common.Login_success {
//logger.Infof("leave room success,uid:%v,group:%v", uid, msg.GroupId)
} else {
logger.Warnf("login RPC failed for %d, %v", status, err)
}
return err
} else {
return err
}
}
func processRoomHeartbeat(logger *log.Entry, uid uint64, pbData []byte) error {
msg := &userProxy.RoomHeartBeat{}
err := proto.Unmarshal(pbData, msg)
if err == nil {
err, status := doRoomHeartbeat(uid, msg.GroupId)
if err == nil && status == common.Login_success {
//logger.Infof("room heartbeat success,uid:%v,group:%v", uid, msg.GroupId)
} else {
logger.Warnf("login RPC failed for %d, %v", status, err)
}
return err
} else {
return err
}
}
func processGroupMicChangeRsp(logger *log.Entry, uid uint64, pbData []byte) error {
msg := &userProxy.GroupMicChangeRsp{}
err := proto.Unmarshal(pbData, msg)
if err == nil {
//logger.Infof("groupMicChangeRsp,uid:%v,seqId:%v", uid, msg.SeqId)
} else {
return err
}
return err
}
// 直接发匹配成功消息,调试用
func homePage(w http.ResponseWriter, r *http.Request) {
rsp := "Home Page"
var targetUserId uint64 = 0
fromUserId := ""
toUserId := ""
if r.ParseForm() == nil {
targetUserId, _ = strconv.ParseUint(r.FormValue("target"), 10, 64)
fromUserId = r.FormValue("from")
toUserId = r.FormValue("to")
}
if targetUserId == 0 || len(fromUserId) == 0 || len(toUserId) == 0 {
rsp = "targetUserId or fromUserId or toUserId is missing."
} else {
m := getChans(targetUserId)
if m == nil {
rsp = "User not found"
} else {
for i, c := range m {
rsp += fmt.Sprintf("address %d bound to channel %v", i, c)
}
}
}
fmt.Fprint(w, rsp)
}
func setUserChan(uid uint64, addr string, c chan ProtoData) {
m.Lock()
defer m.Unlock()
if userChan[uid] == nil {
userChan[uid] = make(map[string]chan ProtoData)
}
userChan[uid][addr] = c
}
func getUserChan(uid uint64, addr string) chan ProtoData {
m.RLock()
defer m.RUnlock()
if userChan[uid] == nil {
return nil
} else {
return userChan[uid][addr]
}
}
func getChans(uid uint64) map[string]chan ProtoData {
m.RLock()
defer m.RUnlock()
return userChan[uid]
}
func removeUserChan(uid uint64, addr string) int {
m.Lock()
defer m.Unlock()
if chs, ok := userChan[uid]; ok {
delete(chs, addr)
if len(chs) == 0 {
delete(userChan, uid)
}
}
return len(userChan)
}
const (
rpcListenPortBase = 50050
)
type server struct {
userCenter.UnimplementedRouterServer
}
func (s *server) Route(ctx context.Context, in *userCenter.RouteMessage) (*userCenter.RouteMessageRsp, error) {
defer func() {
if r := recover(); r != nil {
//打印错误堆栈信息
mylogrus.MyLog.Errorf("Route SYSTEM ACTION PANIC: %v, stack: %v", r, string(debug.Stack()))
}
}()
//peerInfo, _ := peer.FromContext(ctx)
//mylogrus.MyLog.Infof("Received Route request from %s: msgType = %d, uid = %d, payload size = %d",
// peerInfo.Addr.String(), in.GetMsgType(), in.GetUid(), len(in.GetPayLoad()))
var status uint32 = common.ROUTE_SUCCESS
m := getChans(in.GetUid())
if len(m) > 0 {
for _, c := range m {
//mylogrus.MyLog.Infof("Route to user %d, addr %s, channel %v", in.Uid, i, c)
c <- ProtoData{
MsgType: in.GetMsgType(),
Data: in.GetPayLoad(),
}
}
} else {
status = common.ROUTE_CHANNEL_NOT_FOUND
mylogrus.MyLog.Warnf("No write channel for user %d", in.GetUid())
}
return &userCenter.RouteMessageRsp{Status: status}, nil
}
func (s *server) KickUser(ctx context.Context, in *userCenter.KickMessage) (*userCenter.KickMessageRsp, error) {
peerInfo, _ := peer.FromContext(ctx)
logger := mylogrus.MyLog.WithFields(log.Fields{
"uid": in.Uid,
"addr": in.Addr,
})
logger.Infof("Received KickUser request from %s", peerInfo.Addr.String())
c := getUserChan(in.Uid, in.Addr)
if c == nil {
logger.Infof("No write channel")
} else {
c <- ProtoData{
MsgType: common.MsgTypeKickUser,
}
}
return &userCenter.KickMessageRsp{Status: 0}, nil
}
var userClient userCenter.UserClient
func doLogin(token string, clientAddr string) (error, uint32, uint64) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
r, err := userClient.Login(ctx, &userCenter.LoginMessage{
Token: token,
ProxyAddr: myAddress,
ClientAddr: clientAddr,
})
if err == nil && r != nil {
return err, r.Status, r.Uid
} else {
return err, 0, 0
}
}
func doLogout(addr string, uid uint64) (error, uint32) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
r, err := userClient.Logout(ctx, &userCenter.LogoutMessage{
ClientAddr: addr,
Uid: uid,
})
if err == nil && r != nil {
return err, r.Status
} else {
return err, 0
}
}
func doBizRequest(uid uint64, msgType uint32, payLoad string) (uint32, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
r, err := userClient.Transmit(ctx, &userCenter.BizMessage{
Uid: uid,
MsgType: msgType,
PayLoad: payLoad,
})
if err == nil && r != nil {
return r.Status, err
} else {
return 0, err
}
}
func doEnterRoom(uid uint64, groupId string) (error, uint32) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
r, err := userClient.EnterRoom(ctx, &userCenter.EnterRoomMessage{
Uid: uid,
GroupId: groupId,
})
if err == nil && r != nil {
return err, r.Status
} else {
return err, 0
}
}
func doLeaveRoom(uid uint64, groupId string) (error, uint32) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
r, err := userClient.LeaveRoom(ctx, &userCenter.LeaveRoomMessage{
Uid: uid,
GroupId: groupId,
})
if err == nil && r != nil {
return err, r.Status
} else {
return err, 0
}
}
func doRoomHeartbeat(uid uint64, groupId string) (error, uint32) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
r, err := userClient.RoomHeartbeat(ctx, &userCenter.RoomHeartbeatMessage{
Uid: uid,
GroupId: groupId,
})
if err == nil && r != nil {
return err, r.Status
} else {
return err, 0
}
}
var kacp = keepalive.ClientParameters{
Time: 10 * time.Second, // send pings every 10 seconds if there is no activity
Timeout: time.Second, // wait 1 second for ping ack before considering the connection dead
PermitWithoutStream: true, // send pings even without active streams
}
var (
defaultUserCenterAddr = "127.0.0.1:50040" // userCenter default addr
userCenterAddr = defaultUserCenterAddr
userCenterConsulName = "userCenter"
) )
// grpc服务发现
type Builder struct {
addrs map[string][]string
cc resolver.ClientConn
}
func (b *Builder) Scheme() string {
return "uc" // userCenter
}
type Resolver struct {
}
func (r Resolver) ResolveNow(options resolver.ResolveNowOptions) {}
func (r Resolver) Close() {}
func (b *Builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
r := &Resolver{}
paths := b.addrs[target.URL.Path]
addrs := make([]resolver.Address, len(paths))
for i, s := range paths {
addrs[i] = resolver.Address{Addr: s}
}
cc.UpdateState(resolver.State{Addresses: addrs})
b.cc = cc
return r, nil
}
func (b *Builder) UpdateState(addrs []string) {
as := make([]resolver.Address, len(addrs))
for i, s := range addrs {
as[i] = resolver.Address{Addr: s}
}
b.cc.UpdateState(resolver.State{Addresses: as})
}
var certFile = flag.String("certFile", "ws.faceline.live.pem", "cert file")
var keyFile = flag.String("keyFile", "ws.faceline.live.key", "key file")
var listenPort = flag.String("port", "8081", "listen port")
func main() { func main() {
flag.Parse()
// init redis cluster // 代理目标地址
rdbCluster := redis.NewClient(&redis.Options{ // 可设置握手前回调函数,修改request信息,增减头部,权限验证等等
Addr: config.GetConfigRedis().REDIS_CLUSTER_HOST, wp, err := websocketproxy.NewProxy("wss://162.62.97.146:8082/ws", func(r *http.Request) error {
Password: config.GetConfigRedis().REDIS_CLUSTER_PASSWORD, // 握手时设置cookie, 权限验证
r.Header.Set("Cookie", "----")
// 伪装来源
//r.Header.Set("Origin", "http://82.157.123.54:9010")
return nil
}) })
myLocalIp, err := common.GetClientIpV2()
if err != nil { if err != nil {
mylogrus.MyLog.Fatal(err) panic(err)
} //t.Fatal()
bd := &Builder{addrs: map[string][]string{"/api": {userCenterAddr}}}
if rdbCluster != nil {
redisKey := fmt.Sprintf("service:userCenter")
ipPorts, err := rdbCluster.ZRangeByScore(context.Background(), redisKey, &redis.ZRangeBy{
Min: fmt.Sprintf("%d", time.Now().Add(-time.Second*15).Unix()), // 3倍心跳
Max: "+inf",
}).Result()
if err != nil {
failMsg := fmt.Sprintf("get service fail,svc:%v,err:%v", "userCenter", err)
mylogrus.MyLog.Errorf(failMsg)
} else if len(ipPorts) > 0 {
bd = &Builder{addrs: map[string][]string{"/api": ipPorts}}
userCenterAddr = "uc:///api"
mylogrus.MyLog.Infof("userCenterAddr:%v,addr:%v", userCenterAddr, ipPorts)
}
} }
// 服务发现 // 设置代理路径
resolver.Register(bd) http.HandleFunc("/ws", wp.Proxy)
go func() { http.ListenAndServe(":8082", nil)
//consul.RegisterWatcher(userCenterConsulName, func(addr []string) {
// if len(addr) > 0 {
// bd.UpdateState(addr) // 更新新的注册名
// }
//})
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
redisKey := fmt.Sprintf("service:userCenter")
ipPorts, err := rdbCluster.ZRangeByScore(context.Background(), redisKey, &redis.ZRangeBy{
Min: fmt.Sprintf("%d", time.Now().Add(-time.Second*15).Unix()), // 3倍心跳
Max: "+inf",
}).Result()
if err != nil {
failMsg := fmt.Sprintf("get service fail,svc:%v,err:%v", "userCenter", err)
mylogrus.MyLog.Errorf(failMsg)
break
}
if len(ipPorts) <= 0 {
failMsg := fmt.Sprintf("get service empty,svc:%v,err:%v", "userCenter", err)
mylogrus.MyLog.Errorf(failMsg)
break
} else {
bd.UpdateState(ipPorts) // 更新新的注册名
}
}
}
}()
// Set up a connection to the userCenter.
conn, err := grpc.Dial(userCenterAddr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithKeepaliveParams(kacp),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy":"%s"}`, "round_robin")))
if err != nil {
mylogrus.MyLog.Fatalf("did not connect: %v", err)
}
//defer conn.Close()
userClient = userCenter.NewUserClient(conn)
if userClient == nil {
mylogrus.MyLog.Fatalln("userClient null")
}
go func() {
var i int
var lis net.Listener
var err error
for i = 0; i < 10; i++ {
addr := ":" + strconv.Itoa(rpcListenPortBase+i)
lis, err = net.Listen("tcp", addr)
if err == nil {
mylogrus.MyLog.Infof("Go RPC listening on %s", lis.Addr().String())
break
}
}
if i >= 10 {
mylogrus.MyLog.Fatalln("No RPC Listen port available.")
}
myAddress = myLocalIp + ":" + strconv.Itoa(rpcListenPortBase+i)
mylogrus.MyLog.Infof("My address is %s", myAddress)
s := grpc.NewServer()
userCenter.RegisterRouterServer(s, &server{})
if err := s.Serve(lis); err != nil {
mylogrus.MyLog.Fatalf("failed to serve: %v", err)
}
}()
setupRoutes()
addr := ":" + *listenPort
mylogrus.MyLog.Infof("Go Websocket listening on %s", addr)
// http ws
mux := http.NewServeMux()
mux.HandleFunc("/ws", serverWebsocket)
go http.ListenAndServe(":8082", mux)
// https wss
mylogrus.MyLog.Infof("certFile = %s, keyFile = %s", *certFile, *keyFile)
mylogrus.MyLog.Printf("%s", http.ListenAndServeTLS(addr, *certFile, *keyFile, nil).Error())
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment