From 8526739066ccb00ccd24b74650a7d7b284442985 Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Thu, 21 Jun 2018 13:10:40 -0700 Subject: MM-10934 Update server dependencies. (#8981) * Changing throttled import path. * Upgrading dependencies. --- vendor/github.com/go-redis/redis/cluster.go | 244 ++++++++++++++++----- vendor/github.com/go-redis/redis/command.go | 12 +- vendor/github.com/go-redis/redis/commands.go | 8 +- .../go-redis/redis/internal/pool/pool.go | 205 ++++++++++------- .../go-redis/redis/internal/pool/pool_single.go | 12 +- .../go-redis/redis/internal/pool/pool_sticky.go | 54 ++--- vendor/github.com/go-redis/redis/options.go | 3 +- vendor/github.com/go-redis/redis/redis.go | 27 +-- vendor/github.com/go-redis/redis/result.go | 34 +-- vendor/github.com/go-redis/redis/ring.go | 43 ++-- vendor/github.com/go-redis/redis/sentinel.go | 34 +-- vendor/github.com/go-redis/redis/universal.go | 14 +- 12 files changed, 433 insertions(+), 257 deletions(-) (limited to 'vendor/github.com/go-redis') diff --git a/vendor/github.com/go-redis/redis/cluster.go b/vendor/github.com/go-redis/redis/cluster.go index 4a2951157..0c58c8532 100644 --- a/vendor/github.com/go-redis/redis/cluster.go +++ b/vendor/github.com/go-redis/redis/cluster.go @@ -2,11 +2,13 @@ package redis import ( "context" + "crypto/tls" "errors" "fmt" "math" "math/rand" "net" + "strings" "sync" "sync/atomic" "time" @@ -34,6 +36,7 @@ type ClusterOptions struct { // Enables read-only commands on slave nodes. ReadOnly bool // Allows routing read-only commands to the closest master or slave node. + // It automatically enables ReadOnly. RouteByLatency bool // Allows routing read-only commands to the random master or slave node. RouteRandomly bool @@ -56,6 +59,8 @@ type ClusterOptions struct { PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration + + TLSConfig *tls.Config } func (opt *ClusterOptions) init() { @@ -117,6 +122,8 @@ func (opt *ClusterOptions) clientOptions() *Options { IdleTimeout: opt.IdleTimeout, IdleCheckFrequency: disableIdleCheck, + + TLSConfig: opt.TLSConfig, } } @@ -145,6 +152,10 @@ func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { return &node } +func (n *clusterNode) String() string { + return n.Client.String() +} + func (n *clusterNode) Close() error { return n.Client.Close() } @@ -215,7 +226,7 @@ type clusterNodes struct { nodeCreateGroup singleflight.Group - generation uint32 + _generation uint32 // atomic } func newClusterNodes(opt *ClusterOptions) *clusterNodes { @@ -272,8 +283,7 @@ func (c *clusterNodes) Addrs() ([]string, error) { } func (c *clusterNodes) NextGeneration() uint32 { - c.generation++ - return c.generation + return atomic.AddUint32(&c._generation, 1) } // GC removes unused nodes. @@ -296,10 +306,9 @@ func (c *clusterNodes) GC(generation uint32) { } } -func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) { +func (c *clusterNodes) Get(addr string) (*clusterNode, error) { var node *clusterNode var err error - c.mu.RLock() if c.closed { err = pool.ErrClosed @@ -307,6 +316,11 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) { node = c.allNodes[addr] } c.mu.RUnlock() + return node, err +} + +func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) { + node, err := c.Get(addr) if err != nil { return nil, err } @@ -371,20 +385,25 @@ func (c *clusterNodes) Random() (*clusterNode, error) { type clusterState struct { nodes *clusterNodes - masters []*clusterNode - slaves []*clusterNode + Masters []*clusterNode + Slaves []*clusterNode slots [][]*clusterNode generation uint32 + createdAt time.Time } -func newClusterState(nodes *clusterNodes, slots []ClusterSlot, origin string) (*clusterState, error) { +func newClusterState( + nodes *clusterNodes, slots []ClusterSlot, origin string, +) (*clusterState, error) { c := clusterState{ - nodes: nodes, - generation: nodes.NextGeneration(), + nodes: nodes, slots: make([][]*clusterNode, hashtag.SlotNumber), + + generation: nodes.NextGeneration(), + createdAt: time.Now(), } isLoopbackOrigin := isLoopbackAddr(origin) @@ -392,7 +411,7 @@ func newClusterState(nodes *clusterNodes, slots []ClusterSlot, origin string) (* var nodes []*clusterNode for i, slotNode := range slot.Nodes { addr := slotNode.Addr - if !isLoopbackOrigin && isLoopbackAddr(addr) { + if !isLoopbackOrigin && useOriginAddr(origin, addr) { addr = origin } @@ -405,9 +424,9 @@ func newClusterState(nodes *clusterNodes, slots []ClusterSlot, origin string) (* nodes = append(nodes, node) if i == 0 { - c.masters = appendNode(c.masters, node) + c.Masters = appendUniqueNode(c.Masters, node) } else { - c.slaves = appendNode(c.slaves, node) + c.Slaves = appendUniqueNode(c.Slaves, node) } } @@ -489,6 +508,28 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode { return nil } +func (c *clusterState) IsConsistent() bool { + if len(c.Masters) > len(c.Slaves) { + return false + } + + for _, master := range c.Masters { + s := master.Client.Info("replication").Val() + if !strings.Contains(s, "role:master") { + return false + } + } + + for _, slave := range c.Slaves { + s := slave.Client.Info("replication").Val() + if !strings.Contains(s, "role:slave") { + return false + } + } + + return true +} + //------------------------------------------------------------------------------ type clusterStateHolder struct { @@ -496,8 +537,8 @@ type clusterStateHolder struct { state atomic.Value - lastErrMu sync.RWMutex - lastErr error + firstErrMu sync.RWMutex + firstErr error reloading uint32 // atomic } @@ -508,12 +549,25 @@ func newClusterStateHolder(fn func() (*clusterState, error)) *clusterStateHolder } } -func (c *clusterStateHolder) Load() (*clusterState, error) { +func (c *clusterStateHolder) Reload() (*clusterState, error) { + state, err := c.reload() + if err != nil { + return nil, err + } + if !state.IsConsistent() { + c.LazyReload() + } + return state, nil +} + +func (c *clusterStateHolder) reload() (*clusterState, error) { state, err := c.load() if err != nil { - c.lastErrMu.Lock() - c.lastErr = err - c.lastErrMu.Unlock() + c.firstErrMu.Lock() + if c.firstErr == nil { + c.firstErr = err + } + c.firstErrMu.Unlock() return nil, err } c.state.Store(state) @@ -527,9 +581,15 @@ func (c *clusterStateHolder) LazyReload() { go func() { defer atomic.StoreUint32(&c.reloading, 0) - _, err := c.Load() - if err == nil { - time.Sleep(time.Second) + for { + state, err := c.reload() + if err != nil { + return + } + time.Sleep(100 * time.Millisecond) + if state.IsConsistent() { + return + } } }() } @@ -537,12 +597,16 @@ func (c *clusterStateHolder) LazyReload() { func (c *clusterStateHolder) Get() (*clusterState, error) { v := c.state.Load() if v != nil { - return v.(*clusterState), nil + state := v.(*clusterState) + if time.Since(state.createdAt) > time.Minute { + c.LazyReload() + } + return state, nil } - c.lastErrMu.RLock() - err := c.lastErr - c.lastErrMu.RUnlock() + c.firstErrMu.RLock() + err := c.firstErr + c.firstErrMu.RUnlock() if err != nil { return nil, err } @@ -576,19 +640,19 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { opt.init() c := &ClusterClient{ - opt: opt, - nodes: newClusterNodes(opt), - cmdsInfoCache: newCmdsInfoCache(), + opt: opt, + nodes: newClusterNodes(opt), } c.state = newClusterStateHolder(c.loadState) + c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo) c.process = c.defaultProcess c.processPipeline = c.defaultProcessPipeline c.processTxPipeline = c.defaultProcessTxPipeline - c.cmdable.setProcessor(c.Process) + c.init() - _, _ = c.state.Load() + _, _ = c.state.Reload() if opt.IdleCheckFrequency > 0 { go c.reaper(opt.IdleCheckFrequency) } @@ -596,6 +660,10 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { return c } +func (c *ClusterClient) init() { + c.cmdable.setProcessor(c.Process) +} + func (c *ClusterClient) Context() context.Context { if c.ctx != nil { return c.ctx @@ -614,6 +682,7 @@ func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient { func (c *ClusterClient) copy() *ClusterClient { cp := *c + cp.init() return &cp } @@ -626,17 +695,39 @@ func (c *ClusterClient) retryBackoff(attempt int) time.Duration { return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) } -func (c *ClusterClient) cmdInfo(name string) *CommandInfo { - cmdsInfo, err := c.cmdsInfoCache.Do(func() (map[string]*CommandInfo, error) { - node, err := c.nodes.Random() +func (c *ClusterClient) cmdsInfo() (map[string]*CommandInfo, error) { + addrs, err := c.nodes.Addrs() + if err != nil { + return nil, err + } + + var firstErr error + for _, addr := range addrs { + node, err := c.nodes.Get(addr) if err != nil { return nil, err } - return node.Client.Command().Result() - }) + if node == nil { + continue + } + + info, err := node.Client.Command().Result() + if err == nil { + return info, nil + } + if firstErr == nil { + firstErr = err + } + } + return nil, firstErr +} + +func (c *ClusterClient) cmdInfo(name string) *CommandInfo { + cmdsInfo, err := c.cmdsInfoCache.Get() if err != nil { return nil } + info := cmdsInfo[name] if info == nil { internal.Logf("info for cmd=%s not found", name) @@ -700,13 +791,14 @@ func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) { func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { if len(keys) == 0 { - return fmt.Errorf("redis: keys don't hash to the same slot") + return fmt.Errorf("redis: Watch requires at least one key") } slot := hashtag.Slot(keys[0]) for _, key := range keys[1:] { if hashtag.Slot(key) != slot { - return fmt.Errorf("redis: Watch requires all keys to be in the same slot") + err := fmt.Errorf("redis: Watch requires all keys to be in the same slot") + return err } } @@ -812,6 +904,12 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error { } if internal.IsRetryableError(err, true) { + // Firstly retry the same node. + if attempt == 0 { + continue + } + + // Secondly try random node. node, err = c.nodes.Random() if err != nil { break @@ -846,14 +944,17 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error { // ForEachMaster concurrently calls the fn on each master node in the cluster. // It returns the first error if any. func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { - state, err := c.state.Get() + state, err := c.state.Reload() if err != nil { - return err + state, err = c.state.Get() + if err != nil { + return err + } } var wg sync.WaitGroup errCh := make(chan error, 1) - for _, master := range state.masters { + for _, master := range state.Masters { wg.Add(1) go func(node *clusterNode) { defer wg.Done() @@ -879,14 +980,17 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { // ForEachSlave concurrently calls the fn on each slave node in the cluster. // It returns the first error if any. func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error { - state, err := c.state.Get() + state, err := c.state.Reload() if err != nil { - return err + state, err = c.state.Get() + if err != nil { + return err + } } var wg sync.WaitGroup errCh := make(chan error, 1) - for _, slave := range state.slaves { + for _, slave := range state.Slaves { wg.Add(1) go func(node *clusterNode) { defer wg.Done() @@ -912,9 +1016,12 @@ func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error { // ForEachNode concurrently calls the fn on each known node in the cluster. // It returns the first error if any. func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error { - state, err := c.state.Get() + state, err := c.state.Reload() if err != nil { - return err + state, err = c.state.Get() + if err != nil { + return err + } } var wg sync.WaitGroup @@ -930,11 +1037,11 @@ func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error { } } - for _, node := range state.masters { + for _, node := range state.Masters { wg.Add(1) go worker(node) } - for _, node := range state.slaves { + for _, node := range state.Slaves { wg.Add(1) go worker(node) } @@ -957,7 +1064,7 @@ func (c *ClusterClient) PoolStats() *PoolStats { return &acc } - for _, node := range state.masters { + for _, node := range state.Masters { s := node.Client.connPool.Stats() acc.Hits += s.Hits acc.Misses += s.Misses @@ -968,7 +1075,7 @@ func (c *ClusterClient) PoolStats() *PoolStats { acc.StaleConns += s.StaleConns } - for _, node := range state.slaves { + for _, node := range state.Slaves { s := node.Client.connPool.Stats() acc.Hits += s.Hits acc.Misses += s.Misses @@ -1065,7 +1172,7 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { failedCmds := make(map[*clusterNode][]Cmder) for node, cmds := range cmdsMap { - cn, _, err := node.Client.getConn() + cn, err := node.Client.getConn() if err != nil { if err == pool.ErrClosed { c.remapCmds(cmds, failedCmds) @@ -1077,9 +1184,9 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { err = c.pipelineProcessCmds(node, cn, cmds, failedCmds) if err == nil || internal.IsRedisError(err) { - _ = node.Client.connPool.Put(cn) + node.Client.connPool.Put(cn) } else { - _ = node.Client.connPool.Remove(cn) + node.Client.connPool.Remove(cn) } } @@ -1229,7 +1336,7 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error { failedCmds := make(map[*clusterNode][]Cmder) for node, cmds := range cmdsMap { - cn, _, err := node.Client.getConn() + cn, err := node.Client.getConn() if err != nil { if err == pool.ErrClosed { c.remapCmds(cmds, failedCmds) @@ -1241,9 +1348,9 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error { err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds) if err == nil || internal.IsRedisError(err) { - _ = node.Client.connPool.Put(cn) + node.Client.connPool.Put(cn) } else { - _ = node.Client.connPool.Remove(cn) + node.Client.connPool.Remove(cn) } } @@ -1387,6 +1494,29 @@ func (c *ClusterClient) PSubscribe(channels ...string) *PubSub { return pubsub } +func useOriginAddr(originAddr, nodeAddr string) bool { + nodeHost, nodePort, err := net.SplitHostPort(nodeAddr) + if err != nil { + return false + } + + nodeIP := net.ParseIP(nodeHost) + if nodeIP == nil { + return false + } + + if !nodeIP.IsLoopback() { + return false + } + + _, originPort, err := net.SplitHostPort(originAddr) + if err != nil { + return false + } + + return nodePort == originPort +} + func isLoopbackAddr(addr string) bool { host, _, err := net.SplitHostPort(addr) if err != nil { @@ -1401,7 +1531,7 @@ func isLoopbackAddr(addr string) bool { return ip.IsLoopback() } -func appendNode(nodes []*clusterNode, node *clusterNode) []*clusterNode { +func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode { for _, n := range nodes { if n == node { return nodes diff --git a/vendor/github.com/go-redis/redis/command.go b/vendor/github.com/go-redis/redis/command.go index 1588ca251..552c897bb 100644 --- a/vendor/github.com/go-redis/redis/command.go +++ b/vendor/github.com/go-redis/redis/command.go @@ -1027,17 +1027,21 @@ func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error { //------------------------------------------------------------------------------ type cmdsInfoCache struct { + fn func() (map[string]*CommandInfo, error) + once internal.Once cmds map[string]*CommandInfo } -func newCmdsInfoCache() *cmdsInfoCache { - return &cmdsInfoCache{} +func newCmdsInfoCache(fn func() (map[string]*CommandInfo, error)) *cmdsInfoCache { + return &cmdsInfoCache{ + fn: fn, + } } -func (c *cmdsInfoCache) Do(fn func() (map[string]*CommandInfo, error)) (map[string]*CommandInfo, error) { +func (c *cmdsInfoCache) Get() (map[string]*CommandInfo, error) { err := c.once.Do(func() error { - cmds, err := fn() + cmds, err := c.fn() if err != nil { return err } diff --git a/vendor/github.com/go-redis/redis/commands.go b/vendor/github.com/go-redis/redis/commands.go index a3dacacd2..c6a88154e 100644 --- a/vendor/github.com/go-redis/redis/commands.go +++ b/vendor/github.com/go-redis/redis/commands.go @@ -266,6 +266,8 @@ type Cmdable interface { GeoDist(key string, member1, member2, unit string) *FloatCmd GeoHash(key string, members ...string) *StringSliceCmd Command() *CommandsInfoCmd + ReadOnly() *StatusCmd + ReadWrite() *StatusCmd } type StatefulCmdable interface { @@ -274,8 +276,6 @@ type StatefulCmdable interface { Select(index int) *StatusCmd SwapDB(index1, index2 int) *StatusCmd ClientSetName(name string) *BoolCmd - ReadOnly() *StatusCmd - ReadWrite() *StatusCmd } var _ Cmdable = (*Client)(nil) @@ -2054,13 +2054,13 @@ func (c *cmdable) ClusterSlaves(nodeID string) *StringSliceCmd { return cmd } -func (c *statefulCmdable) ReadOnly() *StatusCmd { +func (c *cmdable) ReadOnly() *StatusCmd { cmd := NewStatusCmd("readonly") c.process(cmd) return cmd } -func (c *statefulCmdable) ReadWrite() *StatusCmd { +func (c *cmdable) ReadWrite() *StatusCmd { cmd := NewStatusCmd("readwrite") c.process(cmd) return cmd diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool.go b/vendor/github.com/go-redis/redis/internal/pool/pool.go index ae81905ea..cab66904a 100644 --- a/vendor/github.com/go-redis/redis/internal/pool/pool.go +++ b/vendor/github.com/go-redis/redis/internal/pool/pool.go @@ -28,7 +28,8 @@ type Stats struct { Timeouts uint32 // number of times a wait timeout occurred TotalConns uint32 // number of total connections in the pool - FreeConns uint32 // number of free connections in the pool + FreeConns uint32 // deprecated - use IdleConns + IdleConns uint32 // number of idle connections in the pool StaleConns uint32 // number of stale connections removed from the pool } @@ -36,12 +37,12 @@ type Pooler interface { NewConn() (*Conn, error) CloseConn(*Conn) error - Get() (*Conn, bool, error) - Put(*Conn) error - Remove(*Conn) error + Get() (*Conn, error) + Put(*Conn) + Remove(*Conn) Len() int - FreeLen() int + IdleLen() int Stats() *Stats Close() error @@ -70,8 +71,8 @@ type ConnPool struct { connsMu sync.Mutex conns []*Conn - freeConnsMu sync.Mutex - freeConns []*Conn + idleConnsMu sync.RWMutex + idleConns []*Conn stats Stats @@ -86,15 +87,29 @@ func NewConnPool(opt *Options) *ConnPool { queue: make(chan struct{}, opt.PoolSize), conns: make([]*Conn, 0, opt.PoolSize), - freeConns: make([]*Conn, 0, opt.PoolSize), + idleConns: make([]*Conn, 0, opt.PoolSize), } + if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 { go p.reaper(opt.IdleCheckFrequency) } + return p } func (p *ConnPool) NewConn() (*Conn, error) { + cn, err := p.newConn() + if err != nil { + return nil, err + } + + p.connsMu.Lock() + p.conns = append(p.conns, cn) + p.connsMu.Unlock() + return cn, nil +} + +func (p *ConnPool) newConn() (*Conn, error) { if p.closed() { return nil, ErrClosed } @@ -112,12 +127,7 @@ func (p *ConnPool) NewConn() (*Conn, error) { return nil, err } - cn := NewConn(netConn) - p.connsMu.Lock() - p.conns = append(p.conns, cn) - p.connsMu.Unlock() - - return cn, nil + return NewConn(netConn), nil } func (p *ConnPool) tryDial() { @@ -153,34 +163,20 @@ func (p *ConnPool) getLastDialError() error { } // Get returns existed connection from the pool or creates a new one. -func (p *ConnPool) Get() (*Conn, bool, error) { +func (p *ConnPool) Get() (*Conn, error) { if p.closed() { - return nil, false, ErrClosed + return nil, ErrClosed } - select { - case p.queue <- struct{}{}: - default: - timer := timers.Get().(*time.Timer) - timer.Reset(p.opt.PoolTimeout) - - select { - case p.queue <- struct{}{}: - if !timer.Stop() { - <-timer.C - } - timers.Put(timer) - case <-timer.C: - timers.Put(timer) - atomic.AddUint32(&p.stats.Timeouts, 1) - return nil, false, ErrPoolTimeout - } + err := p.waitTurn() + if err != nil { + return nil, err } for { - p.freeConnsMu.Lock() - cn := p.popFree() - p.freeConnsMu.Unlock() + p.idleConnsMu.Lock() + cn := p.popIdle() + p.idleConnsMu.Unlock() if cn == nil { break @@ -192,50 +188,89 @@ func (p *ConnPool) Get() (*Conn, bool, error) { } atomic.AddUint32(&p.stats.Hits, 1) - return cn, false, nil + return cn, nil } atomic.AddUint32(&p.stats.Misses, 1) newcn, err := p.NewConn() if err != nil { - <-p.queue - return nil, false, err + p.freeTurn() + return nil, err } - return newcn, true, nil + return newcn, nil +} + +func (p *ConnPool) getTurn() { + p.queue <- struct{}{} } -func (p *ConnPool) popFree() *Conn { - if len(p.freeConns) == 0 { +func (p *ConnPool) waitTurn() error { + select { + case p.queue <- struct{}{}: + return nil + default: + timer := timers.Get().(*time.Timer) + timer.Reset(p.opt.PoolTimeout) + + select { + case p.queue <- struct{}{}: + if !timer.Stop() { + <-timer.C + } + timers.Put(timer) + return nil + case <-timer.C: + timers.Put(timer) + atomic.AddUint32(&p.stats.Timeouts, 1) + return ErrPoolTimeout + } + } +} + +func (p *ConnPool) freeTurn() { + <-p.queue +} + +func (p *ConnPool) popIdle() *Conn { + if len(p.idleConns) == 0 { return nil } - idx := len(p.freeConns) - 1 - cn := p.freeConns[idx] - p.freeConns = p.freeConns[:idx] + idx := len(p.idleConns) - 1 + cn := p.idleConns[idx] + p.idleConns = p.idleConns[:idx] + return cn } -func (p *ConnPool) Put(cn *Conn) error { - if data := cn.Rd.PeekBuffered(); data != nil { - internal.Logf("connection has unread data: %q", data) - return p.Remove(cn) +func (p *ConnPool) Put(cn *Conn) { + buf := cn.Rd.PeekBuffered() + if buf != nil { + internal.Logf("connection has unread data: %.100q", buf) + p.Remove(cn) + return } - p.freeConnsMu.Lock() - p.freeConns = append(p.freeConns, cn) - p.freeConnsMu.Unlock() - <-p.queue - return nil + + p.idleConnsMu.Lock() + p.idleConns = append(p.idleConns, cn) + p.idleConnsMu.Unlock() + p.freeTurn() } -func (p *ConnPool) Remove(cn *Conn) error { - _ = p.CloseConn(cn) - <-p.queue - return nil +func (p *ConnPool) Remove(cn *Conn) { + p.removeConn(cn) + p.freeTurn() + _ = p.closeConn(cn) } func (p *ConnPool) CloseConn(cn *Conn) error { + p.removeConn(cn) + return p.closeConn(cn) +} + +func (p *ConnPool) removeConn(cn *Conn) { p.connsMu.Lock() for i, c := range p.conns { if c == cn { @@ -244,8 +279,6 @@ func (p *ConnPool) CloseConn(cn *Conn) error { } } p.connsMu.Unlock() - - return p.closeConn(cn) } func (p *ConnPool) closeConn(cn *Conn) error { @@ -263,22 +296,24 @@ func (p *ConnPool) Len() int { return l } -// FreeLen returns number of free connections. -func (p *ConnPool) FreeLen() int { - p.freeConnsMu.Lock() - l := len(p.freeConns) - p.freeConnsMu.Unlock() +// FreeLen returns number of idle connections. +func (p *ConnPool) IdleLen() int { + p.idleConnsMu.RLock() + l := len(p.idleConns) + p.idleConnsMu.RUnlock() return l } func (p *ConnPool) Stats() *Stats { + idleLen := p.IdleLen() return &Stats{ Hits: atomic.LoadUint32(&p.stats.Hits), Misses: atomic.LoadUint32(&p.stats.Misses), Timeouts: atomic.LoadUint32(&p.stats.Timeouts), TotalConns: uint32(p.Len()), - FreeConns: uint32(p.FreeLen()), + FreeConns: uint32(idleLen), + IdleConns: uint32(idleLen), StaleConns: atomic.LoadUint32(&p.stats.StaleConns), } } @@ -316,41 +351,45 @@ func (p *ConnPool) Close() error { p.conns = nil p.connsMu.Unlock() - p.freeConnsMu.Lock() - p.freeConns = nil - p.freeConnsMu.Unlock() + p.idleConnsMu.Lock() + p.idleConns = nil + p.idleConnsMu.Unlock() return firstErr } -func (p *ConnPool) reapStaleConn() bool { - if len(p.freeConns) == 0 { - return false +func (p *ConnPool) reapStaleConn() *Conn { + if len(p.idleConns) == 0 { + return nil } - cn := p.freeConns[0] + cn := p.idleConns[0] if !cn.IsStale(p.opt.IdleTimeout) { - return false + return nil } - p.CloseConn(cn) - p.freeConns = append(p.freeConns[:0], p.freeConns[1:]...) + p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...) - return true + return cn } func (p *ConnPool) ReapStaleConns() (int, error) { var n int for { - p.queue <- struct{}{} - p.freeConnsMu.Lock() + p.getTurn() - reaped := p.reapStaleConn() + p.idleConnsMu.Lock() + cn := p.reapStaleConn() + p.idleConnsMu.Unlock() + + if cn != nil { + p.removeConn(cn) + } - p.freeConnsMu.Unlock() - <-p.queue + p.freeTurn() - if reaped { + if cn != nil { + p.closeConn(cn) n++ } else { break diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool_single.go b/vendor/github.com/go-redis/redis/internal/pool/pool_single.go index ff91279b3..b35b78afb 100644 --- a/vendor/github.com/go-redis/redis/internal/pool/pool_single.go +++ b/vendor/github.com/go-redis/redis/internal/pool/pool_single.go @@ -20,29 +20,27 @@ func (p *SingleConnPool) CloseConn(*Conn) error { panic("not implemented") } -func (p *SingleConnPool) Get() (*Conn, bool, error) { - return p.cn, false, nil +func (p *SingleConnPool) Get() (*Conn, error) { + return p.cn, nil } -func (p *SingleConnPool) Put(cn *Conn) error { +func (p *SingleConnPool) Put(cn *Conn) { if p.cn != cn { panic("p.cn != cn") } - return nil } -func (p *SingleConnPool) Remove(cn *Conn) error { +func (p *SingleConnPool) Remove(cn *Conn) { if p.cn != cn { panic("p.cn != cn") } - return nil } func (p *SingleConnPool) Len() int { return 1 } -func (p *SingleConnPool) FreeLen() int { +func (p *SingleConnPool) IdleLen() int { return 0 } diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go b/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go index 17f163858..91bd91333 100644 --- a/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go +++ b/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go @@ -28,55 +28,40 @@ func (p *StickyConnPool) CloseConn(*Conn) error { panic("not implemented") } -func (p *StickyConnPool) Get() (*Conn, bool, error) { +func (p *StickyConnPool) Get() (*Conn, error) { p.mu.Lock() defer p.mu.Unlock() if p.closed { - return nil, false, ErrClosed + return nil, ErrClosed } if p.cn != nil { - return p.cn, false, nil + return p.cn, nil } - cn, _, err := p.pool.Get() + cn, err := p.pool.Get() if err != nil { - return nil, false, err + return nil, err } + p.cn = cn - return cn, true, nil + return cn, nil } -func (p *StickyConnPool) putUpstream() (err error) { - err = p.pool.Put(p.cn) +func (p *StickyConnPool) putUpstream() { + p.pool.Put(p.cn) p.cn = nil - return err } -func (p *StickyConnPool) Put(cn *Conn) error { - p.mu.Lock() - defer p.mu.Unlock() - - if p.closed { - return ErrClosed - } - return nil -} +func (p *StickyConnPool) Put(cn *Conn) {} -func (p *StickyConnPool) removeUpstream() error { - err := p.pool.Remove(p.cn) +func (p *StickyConnPool) removeUpstream() { + p.pool.Remove(p.cn) p.cn = nil - return err } -func (p *StickyConnPool) Remove(cn *Conn) error { - p.mu.Lock() - defer p.mu.Unlock() - - if p.closed { - return nil - } - return p.removeUpstream() +func (p *StickyConnPool) Remove(cn *Conn) { + p.removeUpstream() } func (p *StickyConnPool) Len() int { @@ -89,7 +74,7 @@ func (p *StickyConnPool) Len() int { return 1 } -func (p *StickyConnPool) FreeLen() int { +func (p *StickyConnPool) IdleLen() int { p.mu.Lock() defer p.mu.Unlock() @@ -111,13 +96,14 @@ func (p *StickyConnPool) Close() error { return ErrClosed } p.closed = true - var err error + if p.cn != nil { if p.reusable { - err = p.putUpstream() + p.putUpstream() } else { - err = p.removeUpstream() + p.removeUpstream() } } - return err + + return nil } diff --git a/vendor/github.com/go-redis/redis/options.go b/vendor/github.com/go-redis/redis/options.go index 75648053d..35ce06195 100644 --- a/vendor/github.com/go-redis/redis/options.go +++ b/vendor/github.com/go-redis/redis/options.go @@ -68,8 +68,7 @@ type Options struct { // Default is 5 minutes. IdleTimeout time.Duration // Frequency of idle checks. - // Default is 1 minute. - // When minus value is set, then idle check is disabled. + // Default is 1 minute. -1 disables idle check. IdleCheckFrequency time.Duration // Enables read only queries on slave nodes. diff --git a/vendor/github.com/go-redis/redis/redis.go b/vendor/github.com/go-redis/redis/redis.go index 7a606b70e..beb632e1e 100644 --- a/vendor/github.com/go-redis/redis/redis.go +++ b/vendor/github.com/go-redis/redis/redis.go @@ -60,29 +60,30 @@ func (c *baseClient) newConn() (*pool.Conn, error) { return cn, nil } -func (c *baseClient) getConn() (*pool.Conn, bool, error) { - cn, isNew, err := c.connPool.Get() +func (c *baseClient) getConn() (*pool.Conn, error) { + cn, err := c.connPool.Get() if err != nil { - return nil, false, err + return nil, err } if !cn.Inited { - if err := c.initConn(cn); err != nil { - _ = c.connPool.Remove(cn) - return nil, false, err + err := c.initConn(cn) + if err != nil { + c.connPool.Remove(cn) + return nil, err } } - return cn, isNew, nil + return cn, nil } func (c *baseClient) releaseConn(cn *pool.Conn, err error) bool { if internal.IsBadConn(err, false) { - _ = c.connPool.Remove(cn) + c.connPool.Remove(cn) return false } - _ = c.connPool.Put(cn) + c.connPool.Put(cn) return true } @@ -137,7 +138,7 @@ func (c *baseClient) defaultProcess(cmd Cmder) error { time.Sleep(c.retryBackoff(attempt)) } - cn, _, err := c.getConn() + cn, err := c.getConn() if err != nil { cmd.setErr(err) if internal.IsRetryableError(err, true) { @@ -225,7 +226,7 @@ func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) e time.Sleep(c.retryBackoff(attempt)) } - cn, _, err := c.getConn() + cn, err := c.getConn() if err != nil { setCmdsErr(cmds, err) return err @@ -234,10 +235,10 @@ func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) e canRetry, err := p(cn, cmds) if err == nil || internal.IsRedisError(err) { - _ = c.connPool.Put(cn) + c.connPool.Put(cn) break } - _ = c.connPool.Remove(cn) + c.connPool.Remove(cn) if !canRetry || !internal.IsRetryableError(err, true) { break diff --git a/vendor/github.com/go-redis/redis/result.go b/vendor/github.com/go-redis/redis/result.go index 28cea5ca8..e086e8e34 100644 --- a/vendor/github.com/go-redis/redis/result.go +++ b/vendor/github.com/go-redis/redis/result.go @@ -2,7 +2,7 @@ package redis import "time" -// NewCmdResult returns a Cmd initalised with val and err for testing +// NewCmdResult returns a Cmd initialised with val and err for testing func NewCmdResult(val interface{}, err error) *Cmd { var cmd Cmd cmd.val = val @@ -10,7 +10,7 @@ func NewCmdResult(val interface{}, err error) *Cmd { return &cmd } -// NewSliceResult returns a SliceCmd initalised with val and err for testing +// NewSliceResult returns a SliceCmd initialised with val and err for testing func NewSliceResult(val []interface{}, err error) *SliceCmd { var cmd SliceCmd cmd.val = val @@ -18,7 +18,7 @@ func NewSliceResult(val []interface{}, err error) *SliceCmd { return &cmd } -// NewStatusResult returns a StatusCmd initalised with val and err for testing +// NewStatusResult returns a StatusCmd initialised with val and err for testing func NewStatusResult(val string, err error) *StatusCmd { var cmd StatusCmd cmd.val = val @@ -26,7 +26,7 @@ func NewStatusResult(val string, err error) *StatusCmd { return &cmd } -// NewIntResult returns an IntCmd initalised with val and err for testing +// NewIntResult returns an IntCmd initialised with val and err for testing func NewIntResult(val int64, err error) *IntCmd { var cmd IntCmd cmd.val = val @@ -34,7 +34,7 @@ func NewIntResult(val int64, err error) *IntCmd { return &cmd } -// NewDurationResult returns a DurationCmd initalised with val and err for testing +// NewDurationResult returns a DurationCmd initialised with val and err for testing func NewDurationResult(val time.Duration, err error) *DurationCmd { var cmd DurationCmd cmd.val = val @@ -42,7 +42,7 @@ func NewDurationResult(val time.Duration, err error) *DurationCmd { return &cmd } -// NewBoolResult returns a BoolCmd initalised with val and err for testing +// NewBoolResult returns a BoolCmd initialised with val and err for testing func NewBoolResult(val bool, err error) *BoolCmd { var cmd BoolCmd cmd.val = val @@ -50,7 +50,7 @@ func NewBoolResult(val bool, err error) *BoolCmd { return &cmd } -// NewStringResult returns a StringCmd initalised with val and err for testing +// NewStringResult returns a StringCmd initialised with val and err for testing func NewStringResult(val string, err error) *StringCmd { var cmd StringCmd cmd.val = []byte(val) @@ -58,7 +58,7 @@ func NewStringResult(val string, err error) *StringCmd { return &cmd } -// NewFloatResult returns a FloatCmd initalised with val and err for testing +// NewFloatResult returns a FloatCmd initialised with val and err for testing func NewFloatResult(val float64, err error) *FloatCmd { var cmd FloatCmd cmd.val = val @@ -66,7 +66,7 @@ func NewFloatResult(val float64, err error) *FloatCmd { return &cmd } -// NewStringSliceResult returns a StringSliceCmd initalised with val and err for testing +// NewStringSliceResult returns a StringSliceCmd initialised with val and err for testing func NewStringSliceResult(val []string, err error) *StringSliceCmd { var cmd StringSliceCmd cmd.val = val @@ -74,7 +74,7 @@ func NewStringSliceResult(val []string, err error) *StringSliceCmd { return &cmd } -// NewBoolSliceResult returns a BoolSliceCmd initalised with val and err for testing +// NewBoolSliceResult returns a BoolSliceCmd initialised with val and err for testing func NewBoolSliceResult(val []bool, err error) *BoolSliceCmd { var cmd BoolSliceCmd cmd.val = val @@ -82,7 +82,7 @@ func NewBoolSliceResult(val []bool, err error) *BoolSliceCmd { return &cmd } -// NewStringStringMapResult returns a StringStringMapCmd initalised with val and err for testing +// NewStringStringMapResult returns a StringStringMapCmd initialised with val and err for testing func NewStringStringMapResult(val map[string]string, err error) *StringStringMapCmd { var cmd StringStringMapCmd cmd.val = val @@ -90,7 +90,7 @@ func NewStringStringMapResult(val map[string]string, err error) *StringStringMap return &cmd } -// NewStringIntMapCmdResult returns a StringIntMapCmd initalised with val and err for testing +// NewStringIntMapCmdResult returns a StringIntMapCmd initialised with val and err for testing func NewStringIntMapCmdResult(val map[string]int64, err error) *StringIntMapCmd { var cmd StringIntMapCmd cmd.val = val @@ -98,7 +98,7 @@ func NewStringIntMapCmdResult(val map[string]int64, err error) *StringIntMapCmd return &cmd } -// NewZSliceCmdResult returns a ZSliceCmd initalised with val and err for testing +// NewZSliceCmdResult returns a ZSliceCmd initialised with val and err for testing func NewZSliceCmdResult(val []Z, err error) *ZSliceCmd { var cmd ZSliceCmd cmd.val = val @@ -106,7 +106,7 @@ func NewZSliceCmdResult(val []Z, err error) *ZSliceCmd { return &cmd } -// NewScanCmdResult returns a ScanCmd initalised with val and err for testing +// NewScanCmdResult returns a ScanCmd initialised with val and err for testing func NewScanCmdResult(keys []string, cursor uint64, err error) *ScanCmd { var cmd ScanCmd cmd.page = keys @@ -115,7 +115,7 @@ func NewScanCmdResult(keys []string, cursor uint64, err error) *ScanCmd { return &cmd } -// NewClusterSlotsCmdResult returns a ClusterSlotsCmd initalised with val and err for testing +// NewClusterSlotsCmdResult returns a ClusterSlotsCmd initialised with val and err for testing func NewClusterSlotsCmdResult(val []ClusterSlot, err error) *ClusterSlotsCmd { var cmd ClusterSlotsCmd cmd.val = val @@ -123,7 +123,7 @@ func NewClusterSlotsCmdResult(val []ClusterSlot, err error) *ClusterSlotsCmd { return &cmd } -// NewGeoLocationCmdResult returns a GeoLocationCmd initalised with val and err for testing +// NewGeoLocationCmdResult returns a GeoLocationCmd initialised with val and err for testing func NewGeoLocationCmdResult(val []GeoLocation, err error) *GeoLocationCmd { var cmd GeoLocationCmd cmd.locations = val @@ -131,7 +131,7 @@ func NewGeoLocationCmdResult(val []GeoLocation, err error) *GeoLocationCmd { return &cmd } -// NewCommandsInfoCmdResult returns a CommandsInfoCmd initalised with val and err for testing +// NewCommandsInfoCmdResult returns a CommandsInfoCmd initialised with val and err for testing func NewCommandsInfoCmdResult(val map[string]*CommandInfo, err error) *CommandsInfoCmd { var cmd CommandsInfoCmd cmd.val = val diff --git a/vendor/github.com/go-redis/redis/ring.go b/vendor/github.com/go-redis/redis/ring.go index 6d2877413..b47a1094e 100644 --- a/vendor/github.com/go-redis/redis/ring.go +++ b/vendor/github.com/go-redis/redis/ring.go @@ -304,10 +304,11 @@ func NewRing(opt *RingOptions) *Ring { opt.init() ring := &Ring{ - opt: opt, - shards: newRingShards(), - cmdsInfoCache: newCmdsInfoCache(), + opt: opt, + shards: newRingShards(), } + ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo) + ring.processPipeline = ring.defaultProcessPipeline ring.cmdable.setProcessor(ring.Process) @@ -428,21 +429,23 @@ func (c *Ring) ForEachShard(fn func(client *Client) error) error { } } -func (c *Ring) cmdInfo(name string) *CommandInfo { - cmdsInfo, err := c.cmdsInfoCache.Do(func() (map[string]*CommandInfo, error) { - shards := c.shards.List() - firstErr := errRingShardsDown - for _, shard := range shards { - cmdsInfo, err := shard.Client.Command().Result() - if err == nil { - return cmdsInfo, nil - } - if firstErr == nil { - firstErr = err - } +func (c *Ring) cmdsInfo() (map[string]*CommandInfo, error) { + shards := c.shards.List() + firstErr := errRingShardsDown + for _, shard := range shards { + cmdsInfo, err := shard.Client.Command().Result() + if err == nil { + return cmdsInfo, nil } - return nil, firstErr - }) + if firstErr == nil { + firstErr = err + } + } + return nil, firstErr +} + +func (c *Ring) cmdInfo(name string) *CommandInfo { + cmdsInfo, err := c.cmdsInfoCache.Get() if err != nil { return nil } @@ -522,7 +525,7 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error { continue } - cn, _, err := shard.Client.getConn() + cn, err := shard.Client.getConn() if err != nil { setCmdsErr(cmds, err) continue @@ -530,10 +533,10 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error { canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds) if err == nil || internal.IsRedisError(err) { - _ = shard.Client.connPool.Put(cn) + shard.Client.connPool.Put(cn) continue } - _ = shard.Client.connPool.Remove(cn) + shard.Client.connPool.Remove(cn) if canRetry && internal.IsRetryableError(err, true) { if failedCmdsMap == nil { diff --git a/vendor/github.com/go-redis/redis/sentinel.go b/vendor/github.com/go-redis/redis/sentinel.go index 3f56f08b3..3cedf36ee 100644 --- a/vendor/github.com/go-redis/redis/sentinel.go +++ b/vendor/github.com/go-redis/redis/sentinel.go @@ -1,6 +1,7 @@ package redis import ( + "crypto/tls" "errors" "net" "strings" @@ -38,6 +39,8 @@ type FailoverOptions struct { PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration + + TLSConfig *tls.Config } func (opt *FailoverOptions) options() *Options { @@ -59,6 +62,8 @@ func (opt *FailoverOptions) options() *Options { PoolTimeout: opt.PoolTimeout, IdleTimeout: opt.IdleTimeout, IdleCheckFrequency: opt.IdleCheckFrequency, + + TLSConfig: opt.TLSConfig, } } @@ -94,25 +99,23 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client { //------------------------------------------------------------------------------ -type sentinelClient struct { - cmdable +type SentinelClient struct { baseClient } -func newSentinel(opt *Options) *sentinelClient { +func NewSentinelClient(opt *Options) *SentinelClient { opt.init() - c := sentinelClient{ + c := &SentinelClient{ baseClient: baseClient{ opt: opt, connPool: newConnPool(opt), }, } c.baseClient.init() - c.cmdable.setProcessor(c.Process) - return &c + return c } -func (c *sentinelClient) PubSub() *PubSub { +func (c *SentinelClient) PubSub() *PubSub { return &PubSub{ opt: c.opt, @@ -123,13 +126,13 @@ func (c *sentinelClient) PubSub() *PubSub { } } -func (c *sentinelClient) GetMasterAddrByName(name string) *StringSliceCmd { +func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd { cmd := NewStringSliceCmd("SENTINEL", "get-master-addr-by-name", name) c.Process(cmd) return cmd } -func (c *sentinelClient) Sentinels(name string) *SliceCmd { +func (c *SentinelClient) Sentinels(name string) *SliceCmd { cmd := NewSliceCmd("SENTINEL", "sentinels", name) c.Process(cmd) return cmd @@ -146,7 +149,7 @@ type sentinelFailover struct { mu sync.RWMutex masterName string _masterAddr string - sentinel *sentinelClient + sentinel *SentinelClient } func (d *sentinelFailover) Close() error { @@ -200,7 +203,7 @@ func (d *sentinelFailover) masterAddr() (string, error) { } for i, sentinelAddr := range d.sentinelAddrs { - sentinel := newSentinel(&Options{ + sentinel := NewSentinelClient(&Options{ Addr: sentinelAddr, DialTimeout: d.opt.DialTimeout, @@ -214,7 +217,8 @@ func (d *sentinelFailover) masterAddr() (string, error) { masterAddr, err := sentinel.GetMasterAddrByName(d.masterName).Result() if err != nil { - internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s", d.masterName, err) + internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s", + d.masterName, err) sentinel.Close() continue } @@ -241,7 +245,7 @@ func (d *sentinelFailover) switchMaster(masterAddr string) { d._masterAddr = masterAddr } -func (d *sentinelFailover) setSentinel(sentinel *sentinelClient) { +func (d *sentinelFailover) setSentinel(sentinel *SentinelClient) { d.discoverSentinels(sentinel) d.sentinel = sentinel go d.listen(sentinel) @@ -263,7 +267,7 @@ func (d *sentinelFailover) _resetSentinel() error { return err } -func (d *sentinelFailover) discoverSentinels(sentinel *sentinelClient) { +func (d *sentinelFailover) discoverSentinels(sentinel *SentinelClient) { sentinels, err := sentinel.Sentinels(d.masterName).Result() if err != nil { internal.Logf("sentinel: Sentinels master=%q failed: %s", d.masterName, err) @@ -287,7 +291,7 @@ func (d *sentinelFailover) discoverSentinels(sentinel *sentinelClient) { } } -func (d *sentinelFailover) listen(sentinel *sentinelClient) { +func (d *sentinelFailover) listen(sentinel *SentinelClient) { var pubsub *PubSub for { if pubsub == nil { diff --git a/vendor/github.com/go-redis/redis/universal.go b/vendor/github.com/go-redis/redis/universal.go index fde3c4150..9e30c81d9 100644 --- a/vendor/github.com/go-redis/redis/universal.go +++ b/vendor/github.com/go-redis/redis/universal.go @@ -1,6 +1,9 @@ package redis -import "time" +import ( + "crypto/tls" + "time" +) // UniversalOptions information is required by UniversalClient to establish // connections. @@ -27,6 +30,7 @@ type UniversalOptions struct { // Common options + OnConnect func(*Conn) error MaxRetries int Password string DialTimeout time.Duration @@ -36,6 +40,7 @@ type UniversalOptions struct { PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration + TLSConfig *tls.Config } func (o *UniversalOptions) cluster() *ClusterOptions { @@ -49,6 +54,7 @@ func (o *UniversalOptions) cluster() *ClusterOptions { RouteByLatency: o.RouteByLatency, ReadOnly: o.ReadOnly, + OnConnect: o.OnConnect, MaxRetries: o.MaxRetries, Password: o.Password, DialTimeout: o.DialTimeout, @@ -58,6 +64,7 @@ func (o *UniversalOptions) cluster() *ClusterOptions { PoolTimeout: o.PoolTimeout, IdleTimeout: o.IdleTimeout, IdleCheckFrequency: o.IdleCheckFrequency, + TLSConfig: o.TLSConfig, } } @@ -71,6 +78,7 @@ func (o *UniversalOptions) failover() *FailoverOptions { MasterName: o.MasterName, DB: o.DB, + OnConnect: o.OnConnect, MaxRetries: o.MaxRetries, Password: o.Password, DialTimeout: o.DialTimeout, @@ -80,6 +88,7 @@ func (o *UniversalOptions) failover() *FailoverOptions { PoolTimeout: o.PoolTimeout, IdleTimeout: o.IdleTimeout, IdleCheckFrequency: o.IdleCheckFrequency, + TLSConfig: o.TLSConfig, } } @@ -93,6 +102,7 @@ func (o *UniversalOptions) simple() *Options { Addr: addr, DB: o.DB, + OnConnect: o.OnConnect, MaxRetries: o.MaxRetries, Password: o.Password, DialTimeout: o.DialTimeout, @@ -102,6 +112,7 @@ func (o *UniversalOptions) simple() *Options { PoolTimeout: o.PoolTimeout, IdleTimeout: o.IdleTimeout, IdleCheckFrequency: o.IdleCheckFrequency, + TLSConfig: o.TLSConfig, } } @@ -113,6 +124,7 @@ func (o *UniversalOptions) simple() *Options { // applications locally. type UniversalClient interface { Cmdable + Watch(fn func(*Tx) error, keys ...string) error Process(cmd Cmder) error WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) Subscribe(channels ...string) *PubSub -- cgit v1.2.3-1-g7c22