diff --git a/common/redisCli/redis.go b/common/redisCli/redis.go index a2b1e3dc0ef28d6cda4991ae826c7720b14d1428..c486ea9eaa45de78437e8ea4601b7a30d3ee6076 100644 --- a/common/redisCli/redis.go +++ b/common/redisCli/redis.go @@ -9,6 +9,7 @@ import ( ) var RedisCluster *redis.Client +var RedisClient *redis.Client func init() { RedisCluster = redis.NewClient(&redis.Options{ @@ -25,6 +26,21 @@ func init() { } else { log.Println("redis db0 connection success - " + pong) } + + RedisClient = redis.NewClient(&redis.Options{ + Addr: config.GetConfigRedis().REDIS_HOST, + Password: config.GetConfigRedis().REDIS_PASSWORD, // no password set + DB: 0, // use default DB + PoolSize: 200, + MinIdleConns: 20, + }) + pong, err = RedisClient.Ping(context.Background()).Result() + if err != nil { + mylogrus.MyLog.Warn(err) + mylogrus.MyLog.Fatal("redis db0 connect fail") + } else { + log.Println("redis db0 connection success - " + pong) + } // log hook //RedisClient.AddHook(redisHook{}) } @@ -32,3 +48,7 @@ func init() { func GetRedisCluster() *redis.Client { return RedisCluster } + +func GetRedisClient() *redis.Client { + return RedisClient +} diff --git a/main.go b/main.go index 4491e3366996e531ca939af491c89a52d2aa6ebf..0fe98104515b12f9a814a2cecd0122ec650174b0 100644 --- a/main.go +++ b/main.go @@ -28,6 +28,9 @@ func main() { go func() { deal() }() + go func() { + dealOld() // 临时处理 + }() } for i := 0; i < SEND_WORKER; i++ { go func() { @@ -89,6 +92,33 @@ func deal() { } } +func dealOld() { + for true { + //不需要加锁,注意,阻塞。 + strs, err := redisCli.GetRedisClient().BLPop(context.Background(), time.Second, micInfoChange).Result() + if err != nil { + if err != redis.Nil { + mylogrus.MyLog.Errorf("cron micChangeSys redisCli.GetRedis().BLPop err:+%v", err) + } + } + if len(strs) >= 2 { + content := strs[1] + //mylogrus.MyLog.Infof("cron micChangeSys content:%v", content) + micSystemMsg := MicSystemMsg{} + if err := json.Unmarshal([]byte(content), &micSystemMsg); err != nil { + mylogrus.MyLog.Errorf("cron micChangeSys Unmarshal err:%+v, content:%v", err, content) + } + sendChan <- GroupSystemMsg{ + MsgGroupUid: micSystemMsg.GroupUid, + MsgId: micSystemMsg.MsgId, + Source: micSystemMsg.Source, + Target: micSystemMsg.Target, + Content: micSystemMsg.Content, + } + } + } +} + //var limiter = rate.NewLimiter(2000, 2000) func send() {