diff options
Diffstat (limited to 'vendor/github.com/go-redis/redis/sentinel.go')
-rw-r--r-- | vendor/github.com/go-redis/redis/sentinel.go | 82 |
1 files changed, 43 insertions, 39 deletions
diff --git a/vendor/github.com/go-redis/redis/sentinel.go b/vendor/github.com/go-redis/redis/sentinel.go index 3cedf36ee..c5f71493d 100644 --- a/vendor/github.com/go-redis/redis/sentinel.go +++ b/vendor/github.com/go-redis/redis/sentinel.go @@ -29,13 +29,17 @@ type FailoverOptions struct { Password string DB int - MaxRetries int + MaxRetries int + MinRetryBackoff time.Duration + MaxRetryBackoff time.Duration DialTimeout time.Duration ReadTimeout time.Duration WriteTimeout time.Duration PoolSize int + MinIdleConns int + MaxConnAge time.Duration PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration @@ -92,7 +96,7 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client { }, } c.baseClient.init() - c.setProcessor(c.Process) + c.cmdable.setProcessor(c.Process) return &c } @@ -116,7 +120,7 @@ func NewSentinelClient(opt *Options) *SentinelClient { } func (c *SentinelClient) PubSub() *PubSub { - return &PubSub{ + pubsub := &PubSub{ opt: c.opt, newConn: func(channels []string) (*pool.Conn, error) { @@ -124,6 +128,8 @@ func (c *SentinelClient) PubSub() *PubSub { }, closeConn: c.connPool.CloseConn, } + pubsub.init() + return pubsub } func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd { @@ -180,10 +186,7 @@ func (d *sentinelFailover) MasterAddr() (string, error) { if err != nil { return "", err } - - if d._masterAddr != addr { - d.switchMaster(addr) - } + d._switchMaster(addr) return addr, nil } @@ -194,11 +197,11 @@ func (d *sentinelFailover) masterAddr() (string, error) { addr, err := d.sentinel.GetMasterAddrByName(d.masterName).Result() if err == nil { addr := net.JoinHostPort(addr[0], addr[1]) - internal.Logf("sentinel: master=%q addr=%q", d.masterName, addr) return addr, nil } - internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s", d.masterName, err) + internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s", + d.masterName, err) d._resetSentinel() } @@ -234,15 +237,23 @@ func (d *sentinelFailover) masterAddr() (string, error) { return "", errors.New("redis: all sentinels are unreachable") } -func (d *sentinelFailover) switchMaster(masterAddr string) { - internal.Logf( - "sentinel: new master=%q addr=%q", - d.masterName, masterAddr, - ) - _ = d.Pool().Filter(func(cn *pool.Conn) bool { - return cn.RemoteAddr().String() != masterAddr +func (c *sentinelFailover) switchMaster(addr string) { + c.mu.Lock() + c._switchMaster(addr) + c.mu.Unlock() +} + +func (c *sentinelFailover) _switchMaster(addr string) { + if c._masterAddr == addr { + return + } + + internal.Logf("sentinel: new master=%q addr=%q", + c.masterName, addr) + _ = c.Pool().Filter(func(cn *pool.Conn) bool { + return cn.RemoteAddr().String() != addr }) - d._masterAddr = masterAddr + c._masterAddr = addr } func (d *sentinelFailover) setSentinel(sentinel *SentinelClient) { @@ -292,27 +303,25 @@ func (d *sentinelFailover) discoverSentinels(sentinel *SentinelClient) { } func (d *sentinelFailover) listen(sentinel *SentinelClient) { - var pubsub *PubSub - for { - if pubsub == nil { - pubsub = sentinel.PubSub() + pubsub := sentinel.PubSub() + defer pubsub.Close() - if err := pubsub.Subscribe("+switch-master"); err != nil { - internal.Logf("sentinel: Subscribe failed: %s", err) - pubsub.Close() - d.resetSentinel() - return - } - } + err := pubsub.Subscribe("+switch-master") + if err != nil { + internal.Logf("sentinel: Subscribe failed: %s", err) + d.resetSentinel() + return + } + for { msg, err := pubsub.ReceiveMessage() if err != nil { - if err != pool.ErrClosed { - internal.Logf("sentinel: ReceiveMessage failed: %s", err) - pubsub.Close() + if err == pool.ErrClosed { + d.resetSentinel() + return } - d.resetSentinel() - return + internal.Logf("sentinel: ReceiveMessage failed: %s", err) + continue } switch msg.Channel { @@ -323,12 +332,7 @@ func (d *sentinelFailover) listen(sentinel *SentinelClient) { continue } addr := net.JoinHostPort(parts[3], parts[4]) - - d.mu.Lock() - if d._masterAddr != addr { - d.switchMaster(addr) - } - d.mu.Unlock() + d.switchMaster(addr) } } } |