summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis/sentinel.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/go-redis/redis/sentinel.go')
-rw-r--r--vendor/github.com/go-redis/redis/sentinel.go82
1 files changed, 43 insertions, 39 deletions
diff --git a/vendor/github.com/go-redis/redis/sentinel.go b/vendor/github.com/go-redis/redis/sentinel.go
index 3cedf36ee..c5f71493d 100644
--- a/vendor/github.com/go-redis/redis/sentinel.go
+++ b/vendor/github.com/go-redis/redis/sentinel.go
@@ -29,13 +29,17 @@ type FailoverOptions struct {
Password string
DB int
- MaxRetries int
+ MaxRetries int
+ MinRetryBackoff time.Duration
+ MaxRetryBackoff time.Duration
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
PoolSize int
+ MinIdleConns int
+ MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
@@ -92,7 +96,7 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
},
}
c.baseClient.init()
- c.setProcessor(c.Process)
+ c.cmdable.setProcessor(c.Process)
return &c
}
@@ -116,7 +120,7 @@ func NewSentinelClient(opt *Options) *SentinelClient {
}
func (c *SentinelClient) PubSub() *PubSub {
- return &PubSub{
+ pubsub := &PubSub{
opt: c.opt,
newConn: func(channels []string) (*pool.Conn, error) {
@@ -124,6 +128,8 @@ func (c *SentinelClient) PubSub() *PubSub {
},
closeConn: c.connPool.CloseConn,
}
+ pubsub.init()
+ return pubsub
}
func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd {
@@ -180,10 +186,7 @@ func (d *sentinelFailover) MasterAddr() (string, error) {
if err != nil {
return "", err
}
-
- if d._masterAddr != addr {
- d.switchMaster(addr)
- }
+ d._switchMaster(addr)
return addr, nil
}
@@ -194,11 +197,11 @@ func (d *sentinelFailover) masterAddr() (string, error) {
addr, err := d.sentinel.GetMasterAddrByName(d.masterName).Result()
if err == nil {
addr := net.JoinHostPort(addr[0], addr[1])
- internal.Logf("sentinel: master=%q addr=%q", d.masterName, addr)
return addr, nil
}
- internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s", d.masterName, err)
+ internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s",
+ d.masterName, err)
d._resetSentinel()
}
@@ -234,15 +237,23 @@ func (d *sentinelFailover) masterAddr() (string, error) {
return "", errors.New("redis: all sentinels are unreachable")
}
-func (d *sentinelFailover) switchMaster(masterAddr string) {
- internal.Logf(
- "sentinel: new master=%q addr=%q",
- d.masterName, masterAddr,
- )
- _ = d.Pool().Filter(func(cn *pool.Conn) bool {
- return cn.RemoteAddr().String() != masterAddr
+func (c *sentinelFailover) switchMaster(addr string) {
+ c.mu.Lock()
+ c._switchMaster(addr)
+ c.mu.Unlock()
+}
+
+func (c *sentinelFailover) _switchMaster(addr string) {
+ if c._masterAddr == addr {
+ return
+ }
+
+ internal.Logf("sentinel: new master=%q addr=%q",
+ c.masterName, addr)
+ _ = c.Pool().Filter(func(cn *pool.Conn) bool {
+ return cn.RemoteAddr().String() != addr
})
- d._masterAddr = masterAddr
+ c._masterAddr = addr
}
func (d *sentinelFailover) setSentinel(sentinel *SentinelClient) {
@@ -292,27 +303,25 @@ func (d *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
}
func (d *sentinelFailover) listen(sentinel *SentinelClient) {
- var pubsub *PubSub
- for {
- if pubsub == nil {
- pubsub = sentinel.PubSub()
+ pubsub := sentinel.PubSub()
+ defer pubsub.Close()
- if err := pubsub.Subscribe("+switch-master"); err != nil {
- internal.Logf("sentinel: Subscribe failed: %s", err)
- pubsub.Close()
- d.resetSentinel()
- return
- }
- }
+ err := pubsub.Subscribe("+switch-master")
+ if err != nil {
+ internal.Logf("sentinel: Subscribe failed: %s", err)
+ d.resetSentinel()
+ return
+ }
+ for {
msg, err := pubsub.ReceiveMessage()
if err != nil {
- if err != pool.ErrClosed {
- internal.Logf("sentinel: ReceiveMessage failed: %s", err)
- pubsub.Close()
+ if err == pool.ErrClosed {
+ d.resetSentinel()
+ return
}
- d.resetSentinel()
- return
+ internal.Logf("sentinel: ReceiveMessage failed: %s", err)
+ continue
}
switch msg.Channel {
@@ -323,12 +332,7 @@ func (d *sentinelFailover) listen(sentinel *SentinelClient) {
continue
}
addr := net.JoinHostPort(parts[3], parts[4])
-
- d.mu.Lock()
- if d._masterAddr != addr {
- d.switchMaster(addr)
- }
- d.mu.Unlock()
+ d.switchMaster(addr)
}
}
}