summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis
diff options
context:
space:
mode:
authorChristopher Speller <crspeller@gmail.com>2018-06-21 13:10:40 -0700
committerHarrison Healey <harrisonmhealey@gmail.com>2018-06-21 16:10:40 -0400
commit8526739066ccb00ccd24b74650a7d7b284442985 (patch)
tree282512ae2ad95c98a9ca82de304a410b6b56685c /vendor/github.com/go-redis
parenta59ccaa8b3844895dde3980e6224fef46ff4a1c8 (diff)
downloadchat-8526739066ccb00ccd24b74650a7d7b284442985.tar.gz
chat-8526739066ccb00ccd24b74650a7d7b284442985.tar.bz2
chat-8526739066ccb00ccd24b74650a7d7b284442985.zip
MM-10934 Update server dependencies. (#8981)
* Changing throttled import path. * Upgrading dependencies.
Diffstat (limited to 'vendor/github.com/go-redis')
-rw-r--r--vendor/github.com/go-redis/redis/cluster.go244
-rw-r--r--vendor/github.com/go-redis/redis/command.go12
-rw-r--r--vendor/github.com/go-redis/redis/commands.go8
-rw-r--r--vendor/github.com/go-redis/redis/internal/pool/pool.go205
-rw-r--r--vendor/github.com/go-redis/redis/internal/pool/pool_single.go12
-rw-r--r--vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go54
-rw-r--r--vendor/github.com/go-redis/redis/options.go3
-rw-r--r--vendor/github.com/go-redis/redis/redis.go27
-rw-r--r--vendor/github.com/go-redis/redis/result.go34
-rw-r--r--vendor/github.com/go-redis/redis/ring.go43
-rw-r--r--vendor/github.com/go-redis/redis/sentinel.go34
-rw-r--r--vendor/github.com/go-redis/redis/universal.go14
12 files changed, 433 insertions, 257 deletions
diff --git a/vendor/github.com/go-redis/redis/cluster.go b/vendor/github.com/go-redis/redis/cluster.go
index 4a2951157..0c58c8532 100644
--- a/vendor/github.com/go-redis/redis/cluster.go
+++ b/vendor/github.com/go-redis/redis/cluster.go
@@ -2,11 +2,13 @@ package redis
import (
"context"
+ "crypto/tls"
"errors"
"fmt"
"math"
"math/rand"
"net"
+ "strings"
"sync"
"sync/atomic"
"time"
@@ -34,6 +36,7 @@ type ClusterOptions struct {
// Enables read-only commands on slave nodes.
ReadOnly bool
// Allows routing read-only commands to the closest master or slave node.
+ // It automatically enables ReadOnly.
RouteByLatency bool
// Allows routing read-only commands to the random master or slave node.
RouteRandomly bool
@@ -56,6 +59,8 @@ type ClusterOptions struct {
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
+
+ TLSConfig *tls.Config
}
func (opt *ClusterOptions) init() {
@@ -117,6 +122,8 @@ func (opt *ClusterOptions) clientOptions() *Options {
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: disableIdleCheck,
+
+ TLSConfig: opt.TLSConfig,
}
}
@@ -145,6 +152,10 @@ func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
return &node
}
+func (n *clusterNode) String() string {
+ return n.Client.String()
+}
+
func (n *clusterNode) Close() error {
return n.Client.Close()
}
@@ -215,7 +226,7 @@ type clusterNodes struct {
nodeCreateGroup singleflight.Group
- generation uint32
+ _generation uint32 // atomic
}
func newClusterNodes(opt *ClusterOptions) *clusterNodes {
@@ -272,8 +283,7 @@ func (c *clusterNodes) Addrs() ([]string, error) {
}
func (c *clusterNodes) NextGeneration() uint32 {
- c.generation++
- return c.generation
+ return atomic.AddUint32(&c._generation, 1)
}
// GC removes unused nodes.
@@ -296,10 +306,9 @@ func (c *clusterNodes) GC(generation uint32) {
}
}
-func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
+func (c *clusterNodes) Get(addr string) (*clusterNode, error) {
var node *clusterNode
var err error
-
c.mu.RLock()
if c.closed {
err = pool.ErrClosed
@@ -307,6 +316,11 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
node = c.allNodes[addr]
}
c.mu.RUnlock()
+ return node, err
+}
+
+func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
+ node, err := c.Get(addr)
if err != nil {
return nil, err
}
@@ -371,20 +385,25 @@ func (c *clusterNodes) Random() (*clusterNode, error) {
type clusterState struct {
nodes *clusterNodes
- masters []*clusterNode
- slaves []*clusterNode
+ Masters []*clusterNode
+ Slaves []*clusterNode
slots [][]*clusterNode
generation uint32
+ createdAt time.Time
}
-func newClusterState(nodes *clusterNodes, slots []ClusterSlot, origin string) (*clusterState, error) {
+func newClusterState(
+ nodes *clusterNodes, slots []ClusterSlot, origin string,
+) (*clusterState, error) {
c := clusterState{
- nodes: nodes,
- generation: nodes.NextGeneration(),
+ nodes: nodes,
slots: make([][]*clusterNode, hashtag.SlotNumber),
+
+ generation: nodes.NextGeneration(),
+ createdAt: time.Now(),
}
isLoopbackOrigin := isLoopbackAddr(origin)
@@ -392,7 +411,7 @@ func newClusterState(nodes *clusterNodes, slots []ClusterSlot, origin string) (*
var nodes []*clusterNode
for i, slotNode := range slot.Nodes {
addr := slotNode.Addr
- if !isLoopbackOrigin && isLoopbackAddr(addr) {
+ if !isLoopbackOrigin && useOriginAddr(origin, addr) {
addr = origin
}
@@ -405,9 +424,9 @@ func newClusterState(nodes *clusterNodes, slots []ClusterSlot, origin string) (*
nodes = append(nodes, node)
if i == 0 {
- c.masters = appendNode(c.masters, node)
+ c.Masters = appendUniqueNode(c.Masters, node)
} else {
- c.slaves = appendNode(c.slaves, node)
+ c.Slaves = appendUniqueNode(c.Slaves, node)
}
}
@@ -489,6 +508,28 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode {
return nil
}
+func (c *clusterState) IsConsistent() bool {
+ if len(c.Masters) > len(c.Slaves) {
+ return false
+ }
+
+ for _, master := range c.Masters {
+ s := master.Client.Info("replication").Val()
+ if !strings.Contains(s, "role:master") {
+ return false
+ }
+ }
+
+ for _, slave := range c.Slaves {
+ s := slave.Client.Info("replication").Val()
+ if !strings.Contains(s, "role:slave") {
+ return false
+ }
+ }
+
+ return true
+}
+
//------------------------------------------------------------------------------
type clusterStateHolder struct {
@@ -496,8 +537,8 @@ type clusterStateHolder struct {
state atomic.Value
- lastErrMu sync.RWMutex
- lastErr error
+ firstErrMu sync.RWMutex
+ firstErr error
reloading uint32 // atomic
}
@@ -508,12 +549,25 @@ func newClusterStateHolder(fn func() (*clusterState, error)) *clusterStateHolder
}
}
-func (c *clusterStateHolder) Load() (*clusterState, error) {
+func (c *clusterStateHolder) Reload() (*clusterState, error) {
+ state, err := c.reload()
+ if err != nil {
+ return nil, err
+ }
+ if !state.IsConsistent() {
+ c.LazyReload()
+ }
+ return state, nil
+}
+
+func (c *clusterStateHolder) reload() (*clusterState, error) {
state, err := c.load()
if err != nil {
- c.lastErrMu.Lock()
- c.lastErr = err
- c.lastErrMu.Unlock()
+ c.firstErrMu.Lock()
+ if c.firstErr == nil {
+ c.firstErr = err
+ }
+ c.firstErrMu.Unlock()
return nil, err
}
c.state.Store(state)
@@ -527,9 +581,15 @@ func (c *clusterStateHolder) LazyReload() {
go func() {
defer atomic.StoreUint32(&c.reloading, 0)
- _, err := c.Load()
- if err == nil {
- time.Sleep(time.Second)
+ for {
+ state, err := c.reload()
+ if err != nil {
+ return
+ }
+ time.Sleep(100 * time.Millisecond)
+ if state.IsConsistent() {
+ return
+ }
}
}()
}
@@ -537,12 +597,16 @@ func (c *clusterStateHolder) LazyReload() {
func (c *clusterStateHolder) Get() (*clusterState, error) {
v := c.state.Load()
if v != nil {
- return v.(*clusterState), nil
+ state := v.(*clusterState)
+ if time.Since(state.createdAt) > time.Minute {
+ c.LazyReload()
+ }
+ return state, nil
}
- c.lastErrMu.RLock()
- err := c.lastErr
- c.lastErrMu.RUnlock()
+ c.firstErrMu.RLock()
+ err := c.firstErr
+ c.firstErrMu.RUnlock()
if err != nil {
return nil, err
}
@@ -576,19 +640,19 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
opt.init()
c := &ClusterClient{
- opt: opt,
- nodes: newClusterNodes(opt),
- cmdsInfoCache: newCmdsInfoCache(),
+ opt: opt,
+ nodes: newClusterNodes(opt),
}
c.state = newClusterStateHolder(c.loadState)
+ c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
c.process = c.defaultProcess
c.processPipeline = c.defaultProcessPipeline
c.processTxPipeline = c.defaultProcessTxPipeline
- c.cmdable.setProcessor(c.Process)
+ c.init()
- _, _ = c.state.Load()
+ _, _ = c.state.Reload()
if opt.IdleCheckFrequency > 0 {
go c.reaper(opt.IdleCheckFrequency)
}
@@ -596,6 +660,10 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
return c
}
+func (c *ClusterClient) init() {
+ c.cmdable.setProcessor(c.Process)
+}
+
func (c *ClusterClient) Context() context.Context {
if c.ctx != nil {
return c.ctx
@@ -614,6 +682,7 @@ func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient {
func (c *ClusterClient) copy() *ClusterClient {
cp := *c
+ cp.init()
return &cp
}
@@ -626,17 +695,39 @@ func (c *ClusterClient) retryBackoff(attempt int) time.Duration {
return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
}
-func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
- cmdsInfo, err := c.cmdsInfoCache.Do(func() (map[string]*CommandInfo, error) {
- node, err := c.nodes.Random()
+func (c *ClusterClient) cmdsInfo() (map[string]*CommandInfo, error) {
+ addrs, err := c.nodes.Addrs()
+ if err != nil {
+ return nil, err
+ }
+
+ var firstErr error
+ for _, addr := range addrs {
+ node, err := c.nodes.Get(addr)
if err != nil {
return nil, err
}
- return node.Client.Command().Result()
- })
+ if node == nil {
+ continue
+ }
+
+ info, err := node.Client.Command().Result()
+ if err == nil {
+ return info, nil
+ }
+ if firstErr == nil {
+ firstErr = err
+ }
+ }
+ return nil, firstErr
+}
+
+func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
+ cmdsInfo, err := c.cmdsInfoCache.Get()
if err != nil {
return nil
}
+
info := cmdsInfo[name]
if info == nil {
internal.Logf("info for cmd=%s not found", name)
@@ -700,13 +791,14 @@ func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) {
func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
if len(keys) == 0 {
- return fmt.Errorf("redis: keys don't hash to the same slot")
+ return fmt.Errorf("redis: Watch requires at least one key")
}
slot := hashtag.Slot(keys[0])
for _, key := range keys[1:] {
if hashtag.Slot(key) != slot {
- return fmt.Errorf("redis: Watch requires all keys to be in the same slot")
+ err := fmt.Errorf("redis: Watch requires all keys to be in the same slot")
+ return err
}
}
@@ -812,6 +904,12 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
}
if internal.IsRetryableError(err, true) {
+ // Firstly retry the same node.
+ if attempt == 0 {
+ continue
+ }
+
+ // Secondly try random node.
node, err = c.nodes.Random()
if err != nil {
break
@@ -846,14 +944,17 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
// ForEachMaster concurrently calls the fn on each master node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
- state, err := c.state.Get()
+ state, err := c.state.Reload()
if err != nil {
- return err
+ state, err = c.state.Get()
+ if err != nil {
+ return err
+ }
}
var wg sync.WaitGroup
errCh := make(chan error, 1)
- for _, master := range state.masters {
+ for _, master := range state.Masters {
wg.Add(1)
go func(node *clusterNode) {
defer wg.Done()
@@ -879,14 +980,17 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
// ForEachSlave concurrently calls the fn on each slave node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
- state, err := c.state.Get()
+ state, err := c.state.Reload()
if err != nil {
- return err
+ state, err = c.state.Get()
+ if err != nil {
+ return err
+ }
}
var wg sync.WaitGroup
errCh := make(chan error, 1)
- for _, slave := range state.slaves {
+ for _, slave := range state.Slaves {
wg.Add(1)
go func(node *clusterNode) {
defer wg.Done()
@@ -912,9 +1016,12 @@ func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
// ForEachNode concurrently calls the fn on each known node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
- state, err := c.state.Get()
+ state, err := c.state.Reload()
if err != nil {
- return err
+ state, err = c.state.Get()
+ if err != nil {
+ return err
+ }
}
var wg sync.WaitGroup
@@ -930,11 +1037,11 @@ func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
}
}
- for _, node := range state.masters {
+ for _, node := range state.Masters {
wg.Add(1)
go worker(node)
}
- for _, node := range state.slaves {
+ for _, node := range state.Slaves {
wg.Add(1)
go worker(node)
}
@@ -957,7 +1064,7 @@ func (c *ClusterClient) PoolStats() *PoolStats {
return &acc
}
- for _, node := range state.masters {
+ for _, node := range state.Masters {
s := node.Client.connPool.Stats()
acc.Hits += s.Hits
acc.Misses += s.Misses
@@ -968,7 +1075,7 @@ func (c *ClusterClient) PoolStats() *PoolStats {
acc.StaleConns += s.StaleConns
}
- for _, node := range state.slaves {
+ for _, node := range state.Slaves {
s := node.Client.connPool.Stats()
acc.Hits += s.Hits
acc.Misses += s.Misses
@@ -1065,7 +1172,7 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
failedCmds := make(map[*clusterNode][]Cmder)
for node, cmds := range cmdsMap {
- cn, _, err := node.Client.getConn()
+ cn, err := node.Client.getConn()
if err != nil {
if err == pool.ErrClosed {
c.remapCmds(cmds, failedCmds)
@@ -1077,9 +1184,9 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
if err == nil || internal.IsRedisError(err) {
- _ = node.Client.connPool.Put(cn)
+ node.Client.connPool.Put(cn)
} else {
- _ = node.Client.connPool.Remove(cn)
+ node.Client.connPool.Remove(cn)
}
}
@@ -1229,7 +1336,7 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
failedCmds := make(map[*clusterNode][]Cmder)
for node, cmds := range cmdsMap {
- cn, _, err := node.Client.getConn()
+ cn, err := node.Client.getConn()
if err != nil {
if err == pool.ErrClosed {
c.remapCmds(cmds, failedCmds)
@@ -1241,9 +1348,9 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
if err == nil || internal.IsRedisError(err) {
- _ = node.Client.connPool.Put(cn)
+ node.Client.connPool.Put(cn)
} else {
- _ = node.Client.connPool.Remove(cn)
+ node.Client.connPool.Remove(cn)
}
}
@@ -1387,6 +1494,29 @@ func (c *ClusterClient) PSubscribe(channels ...string) *PubSub {
return pubsub
}
+func useOriginAddr(originAddr, nodeAddr string) bool {
+ nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
+ if err != nil {
+ return false
+ }
+
+ nodeIP := net.ParseIP(nodeHost)
+ if nodeIP == nil {
+ return false
+ }
+
+ if !nodeIP.IsLoopback() {
+ return false
+ }
+
+ _, originPort, err := net.SplitHostPort(originAddr)
+ if err != nil {
+ return false
+ }
+
+ return nodePort == originPort
+}
+
func isLoopbackAddr(addr string) bool {
host, _, err := net.SplitHostPort(addr)
if err != nil {
@@ -1401,7 +1531,7 @@ func isLoopbackAddr(addr string) bool {
return ip.IsLoopback()
}
-func appendNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
+func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
for _, n := range nodes {
if n == node {
return nodes
diff --git a/vendor/github.com/go-redis/redis/command.go b/vendor/github.com/go-redis/redis/command.go
index 1588ca251..552c897bb 100644
--- a/vendor/github.com/go-redis/redis/command.go
+++ b/vendor/github.com/go-redis/redis/command.go
@@ -1027,17 +1027,21 @@ func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error {
//------------------------------------------------------------------------------
type cmdsInfoCache struct {
+ fn func() (map[string]*CommandInfo, error)
+
once internal.Once
cmds map[string]*CommandInfo
}
-func newCmdsInfoCache() *cmdsInfoCache {
- return &cmdsInfoCache{}
+func newCmdsInfoCache(fn func() (map[string]*CommandInfo, error)) *cmdsInfoCache {
+ return &cmdsInfoCache{
+ fn: fn,
+ }
}
-func (c *cmdsInfoCache) Do(fn func() (map[string]*CommandInfo, error)) (map[string]*CommandInfo, error) {
+func (c *cmdsInfoCache) Get() (map[string]*CommandInfo, error) {
err := c.once.Do(func() error {
- cmds, err := fn()
+ cmds, err := c.fn()
if err != nil {
return err
}
diff --git a/vendor/github.com/go-redis/redis/commands.go b/vendor/github.com/go-redis/redis/commands.go
index a3dacacd2..c6a88154e 100644
--- a/vendor/github.com/go-redis/redis/commands.go
+++ b/vendor/github.com/go-redis/redis/commands.go
@@ -266,6 +266,8 @@ type Cmdable interface {
GeoDist(key string, member1, member2, unit string) *FloatCmd
GeoHash(key string, members ...string) *StringSliceCmd
Command() *CommandsInfoCmd
+ ReadOnly() *StatusCmd
+ ReadWrite() *StatusCmd
}
type StatefulCmdable interface {
@@ -274,8 +276,6 @@ type StatefulCmdable interface {
Select(index int) *StatusCmd
SwapDB(index1, index2 int) *StatusCmd
ClientSetName(name string) *BoolCmd
- ReadOnly() *StatusCmd
- ReadWrite() *StatusCmd
}
var _ Cmdable = (*Client)(nil)
@@ -2054,13 +2054,13 @@ func (c *cmdable) ClusterSlaves(nodeID string) *StringSliceCmd {
return cmd
}
-func (c *statefulCmdable) ReadOnly() *StatusCmd {
+func (c *cmdable) ReadOnly() *StatusCmd {
cmd := NewStatusCmd("readonly")
c.process(cmd)
return cmd
}
-func (c *statefulCmdable) ReadWrite() *StatusCmd {
+func (c *cmdable) ReadWrite() *StatusCmd {
cmd := NewStatusCmd("readwrite")
c.process(cmd)
return cmd
diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool.go b/vendor/github.com/go-redis/redis/internal/pool/pool.go
index ae81905ea..cab66904a 100644
--- a/vendor/github.com/go-redis/redis/internal/pool/pool.go
+++ b/vendor/github.com/go-redis/redis/internal/pool/pool.go
@@ -28,7 +28,8 @@ type Stats struct {
Timeouts uint32 // number of times a wait timeout occurred
TotalConns uint32 // number of total connections in the pool
- FreeConns uint32 // number of free connections in the pool
+ FreeConns uint32 // deprecated - use IdleConns
+ IdleConns uint32 // number of idle connections in the pool
StaleConns uint32 // number of stale connections removed from the pool
}
@@ -36,12 +37,12 @@ type Pooler interface {
NewConn() (*Conn, error)
CloseConn(*Conn) error
- Get() (*Conn, bool, error)
- Put(*Conn) error
- Remove(*Conn) error
+ Get() (*Conn, error)
+ Put(*Conn)
+ Remove(*Conn)
Len() int
- FreeLen() int
+ IdleLen() int
Stats() *Stats
Close() error
@@ -70,8 +71,8 @@ type ConnPool struct {
connsMu sync.Mutex
conns []*Conn
- freeConnsMu sync.Mutex
- freeConns []*Conn
+ idleConnsMu sync.RWMutex
+ idleConns []*Conn
stats Stats
@@ -86,15 +87,29 @@ func NewConnPool(opt *Options) *ConnPool {
queue: make(chan struct{}, opt.PoolSize),
conns: make([]*Conn, 0, opt.PoolSize),
- freeConns: make([]*Conn, 0, opt.PoolSize),
+ idleConns: make([]*Conn, 0, opt.PoolSize),
}
+
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
go p.reaper(opt.IdleCheckFrequency)
}
+
return p
}
func (p *ConnPool) NewConn() (*Conn, error) {
+ cn, err := p.newConn()
+ if err != nil {
+ return nil, err
+ }
+
+ p.connsMu.Lock()
+ p.conns = append(p.conns, cn)
+ p.connsMu.Unlock()
+ return cn, nil
+}
+
+func (p *ConnPool) newConn() (*Conn, error) {
if p.closed() {
return nil, ErrClosed
}
@@ -112,12 +127,7 @@ func (p *ConnPool) NewConn() (*Conn, error) {
return nil, err
}
- cn := NewConn(netConn)
- p.connsMu.Lock()
- p.conns = append(p.conns, cn)
- p.connsMu.Unlock()
-
- return cn, nil
+ return NewConn(netConn), nil
}
func (p *ConnPool) tryDial() {
@@ -153,34 +163,20 @@ func (p *ConnPool) getLastDialError() error {
}
// Get returns existed connection from the pool or creates a new one.
-func (p *ConnPool) Get() (*Conn, bool, error) {
+func (p *ConnPool) Get() (*Conn, error) {
if p.closed() {
- return nil, false, ErrClosed
+ return nil, ErrClosed
}
- select {
- case p.queue <- struct{}{}:
- default:
- timer := timers.Get().(*time.Timer)
- timer.Reset(p.opt.PoolTimeout)
-
- select {
- case p.queue <- struct{}{}:
- if !timer.Stop() {
- <-timer.C
- }
- timers.Put(timer)
- case <-timer.C:
- timers.Put(timer)
- atomic.AddUint32(&p.stats.Timeouts, 1)
- return nil, false, ErrPoolTimeout
- }
+ err := p.waitTurn()
+ if err != nil {
+ return nil, err
}
for {
- p.freeConnsMu.Lock()
- cn := p.popFree()
- p.freeConnsMu.Unlock()
+ p.idleConnsMu.Lock()
+ cn := p.popIdle()
+ p.idleConnsMu.Unlock()
if cn == nil {
break
@@ -192,50 +188,89 @@ func (p *ConnPool) Get() (*Conn, bool, error) {
}
atomic.AddUint32(&p.stats.Hits, 1)
- return cn, false, nil
+ return cn, nil
}
atomic.AddUint32(&p.stats.Misses, 1)
newcn, err := p.NewConn()
if err != nil {
- <-p.queue
- return nil, false, err
+ p.freeTurn()
+ return nil, err
}
- return newcn, true, nil
+ return newcn, nil
+}
+
+func (p *ConnPool) getTurn() {
+ p.queue <- struct{}{}
}
-func (p *ConnPool) popFree() *Conn {
- if len(p.freeConns) == 0 {
+func (p *ConnPool) waitTurn() error {
+ select {
+ case p.queue <- struct{}{}:
+ return nil
+ default:
+ timer := timers.Get().(*time.Timer)
+ timer.Reset(p.opt.PoolTimeout)
+
+ select {
+ case p.queue <- struct{}{}:
+ if !timer.Stop() {
+ <-timer.C
+ }
+ timers.Put(timer)
+ return nil
+ case <-timer.C:
+ timers.Put(timer)
+ atomic.AddUint32(&p.stats.Timeouts, 1)
+ return ErrPoolTimeout
+ }
+ }
+}
+
+func (p *ConnPool) freeTurn() {
+ <-p.queue
+}
+
+func (p *ConnPool) popIdle() *Conn {
+ if len(p.idleConns) == 0 {
return nil
}
- idx := len(p.freeConns) - 1
- cn := p.freeConns[idx]
- p.freeConns = p.freeConns[:idx]
+ idx := len(p.idleConns) - 1
+ cn := p.idleConns[idx]
+ p.idleConns = p.idleConns[:idx]
+
return cn
}
-func (p *ConnPool) Put(cn *Conn) error {
- if data := cn.Rd.PeekBuffered(); data != nil {
- internal.Logf("connection has unread data: %q", data)
- return p.Remove(cn)
+func (p *ConnPool) Put(cn *Conn) {
+ buf := cn.Rd.PeekBuffered()
+ if buf != nil {
+ internal.Logf("connection has unread data: %.100q", buf)
+ p.Remove(cn)
+ return
}
- p.freeConnsMu.Lock()
- p.freeConns = append(p.freeConns, cn)
- p.freeConnsMu.Unlock()
- <-p.queue
- return nil
+
+ p.idleConnsMu.Lock()
+ p.idleConns = append(p.idleConns, cn)
+ p.idleConnsMu.Unlock()
+ p.freeTurn()
}
-func (p *ConnPool) Remove(cn *Conn) error {
- _ = p.CloseConn(cn)
- <-p.queue
- return nil
+func (p *ConnPool) Remove(cn *Conn) {
+ p.removeConn(cn)
+ p.freeTurn()
+ _ = p.closeConn(cn)
}
func (p *ConnPool) CloseConn(cn *Conn) error {
+ p.removeConn(cn)
+ return p.closeConn(cn)
+}
+
+func (p *ConnPool) removeConn(cn *Conn) {
p.connsMu.Lock()
for i, c := range p.conns {
if c == cn {
@@ -244,8 +279,6 @@ func (p *ConnPool) CloseConn(cn *Conn) error {
}
}
p.connsMu.Unlock()
-
- return p.closeConn(cn)
}
func (p *ConnPool) closeConn(cn *Conn) error {
@@ -263,22 +296,24 @@ func (p *ConnPool) Len() int {
return l
}
-// FreeLen returns number of free connections.
-func (p *ConnPool) FreeLen() int {
- p.freeConnsMu.Lock()
- l := len(p.freeConns)
- p.freeConnsMu.Unlock()
+// FreeLen returns number of idle connections.
+func (p *ConnPool) IdleLen() int {
+ p.idleConnsMu.RLock()
+ l := len(p.idleConns)
+ p.idleConnsMu.RUnlock()
return l
}
func (p *ConnPool) Stats() *Stats {
+ idleLen := p.IdleLen()
return &Stats{
Hits: atomic.LoadUint32(&p.stats.Hits),
Misses: atomic.LoadUint32(&p.stats.Misses),
Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
TotalConns: uint32(p.Len()),
- FreeConns: uint32(p.FreeLen()),
+ FreeConns: uint32(idleLen),
+ IdleConns: uint32(idleLen),
StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
}
}
@@ -316,41 +351,45 @@ func (p *ConnPool) Close() error {
p.conns = nil
p.connsMu.Unlock()
- p.freeConnsMu.Lock()
- p.freeConns = nil
- p.freeConnsMu.Unlock()
+ p.idleConnsMu.Lock()
+ p.idleConns = nil
+ p.idleConnsMu.Unlock()
return firstErr
}
-func (p *ConnPool) reapStaleConn() bool {
- if len(p.freeConns) == 0 {
- return false
+func (p *ConnPool) reapStaleConn() *Conn {
+ if len(p.idleConns) == 0 {
+ return nil
}
- cn := p.freeConns[0]
+ cn := p.idleConns[0]
if !cn.IsStale(p.opt.IdleTimeout) {
- return false
+ return nil
}
- p.CloseConn(cn)
- p.freeConns = append(p.freeConns[:0], p.freeConns[1:]...)
+ p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
- return true
+ return cn
}
func (p *ConnPool) ReapStaleConns() (int, error) {
var n int
for {
- p.queue <- struct{}{}
- p.freeConnsMu.Lock()
+ p.getTurn()
- reaped := p.reapStaleConn()
+ p.idleConnsMu.Lock()
+ cn := p.reapStaleConn()
+ p.idleConnsMu.Unlock()
+
+ if cn != nil {
+ p.removeConn(cn)
+ }
- p.freeConnsMu.Unlock()
- <-p.queue
+ p.freeTurn()
- if reaped {
+ if cn != nil {
+ p.closeConn(cn)
n++
} else {
break
diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool_single.go b/vendor/github.com/go-redis/redis/internal/pool/pool_single.go
index ff91279b3..b35b78afb 100644
--- a/vendor/github.com/go-redis/redis/internal/pool/pool_single.go
+++ b/vendor/github.com/go-redis/redis/internal/pool/pool_single.go
@@ -20,29 +20,27 @@ func (p *SingleConnPool) CloseConn(*Conn) error {
panic("not implemented")
}
-func (p *SingleConnPool) Get() (*Conn, bool, error) {
- return p.cn, false, nil
+func (p *SingleConnPool) Get() (*Conn, error) {
+ return p.cn, nil
}
-func (p *SingleConnPool) Put(cn *Conn) error {
+func (p *SingleConnPool) Put(cn *Conn) {
if p.cn != cn {
panic("p.cn != cn")
}
- return nil
}
-func (p *SingleConnPool) Remove(cn *Conn) error {
+func (p *SingleConnPool) Remove(cn *Conn) {
if p.cn != cn {
panic("p.cn != cn")
}
- return nil
}
func (p *SingleConnPool) Len() int {
return 1
}
-func (p *SingleConnPool) FreeLen() int {
+func (p *SingleConnPool) IdleLen() int {
return 0
}
diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go b/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go
index 17f163858..91bd91333 100644
--- a/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go
+++ b/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go
@@ -28,55 +28,40 @@ func (p *StickyConnPool) CloseConn(*Conn) error {
panic("not implemented")
}
-func (p *StickyConnPool) Get() (*Conn, bool, error) {
+func (p *StickyConnPool) Get() (*Conn, error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
- return nil, false, ErrClosed
+ return nil, ErrClosed
}
if p.cn != nil {
- return p.cn, false, nil
+ return p.cn, nil
}
- cn, _, err := p.pool.Get()
+ cn, err := p.pool.Get()
if err != nil {
- return nil, false, err
+ return nil, err
}
+
p.cn = cn
- return cn, true, nil
+ return cn, nil
}
-func (p *StickyConnPool) putUpstream() (err error) {
- err = p.pool.Put(p.cn)
+func (p *StickyConnPool) putUpstream() {
+ p.pool.Put(p.cn)
p.cn = nil
- return err
}
-func (p *StickyConnPool) Put(cn *Conn) error {
- p.mu.Lock()
- defer p.mu.Unlock()
-
- if p.closed {
- return ErrClosed
- }
- return nil
-}
+func (p *StickyConnPool) Put(cn *Conn) {}
-func (p *StickyConnPool) removeUpstream() error {
- err := p.pool.Remove(p.cn)
+func (p *StickyConnPool) removeUpstream() {
+ p.pool.Remove(p.cn)
p.cn = nil
- return err
}
-func (p *StickyConnPool) Remove(cn *Conn) error {
- p.mu.Lock()
- defer p.mu.Unlock()
-
- if p.closed {
- return nil
- }
- return p.removeUpstream()
+func (p *StickyConnPool) Remove(cn *Conn) {
+ p.removeUpstream()
}
func (p *StickyConnPool) Len() int {
@@ -89,7 +74,7 @@ func (p *StickyConnPool) Len() int {
return 1
}
-func (p *StickyConnPool) FreeLen() int {
+func (p *StickyConnPool) IdleLen() int {
p.mu.Lock()
defer p.mu.Unlock()
@@ -111,13 +96,14 @@ func (p *StickyConnPool) Close() error {
return ErrClosed
}
p.closed = true
- var err error
+
if p.cn != nil {
if p.reusable {
- err = p.putUpstream()
+ p.putUpstream()
} else {
- err = p.removeUpstream()
+ p.removeUpstream()
}
}
- return err
+
+ return nil
}
diff --git a/vendor/github.com/go-redis/redis/options.go b/vendor/github.com/go-redis/redis/options.go
index 75648053d..35ce06195 100644
--- a/vendor/github.com/go-redis/redis/options.go
+++ b/vendor/github.com/go-redis/redis/options.go
@@ -68,8 +68,7 @@ type Options struct {
// Default is 5 minutes.
IdleTimeout time.Duration
// Frequency of idle checks.
- // Default is 1 minute.
- // When minus value is set, then idle check is disabled.
+ // Default is 1 minute. -1 disables idle check.
IdleCheckFrequency time.Duration
// Enables read only queries on slave nodes.
diff --git a/vendor/github.com/go-redis/redis/redis.go b/vendor/github.com/go-redis/redis/redis.go
index 7a606b70e..beb632e1e 100644
--- a/vendor/github.com/go-redis/redis/redis.go
+++ b/vendor/github.com/go-redis/redis/redis.go
@@ -60,29 +60,30 @@ func (c *baseClient) newConn() (*pool.Conn, error) {
return cn, nil
}
-func (c *baseClient) getConn() (*pool.Conn, bool, error) {
- cn, isNew, err := c.connPool.Get()
+func (c *baseClient) getConn() (*pool.Conn, error) {
+ cn, err := c.connPool.Get()
if err != nil {
- return nil, false, err
+ return nil, err
}
if !cn.Inited {
- if err := c.initConn(cn); err != nil {
- _ = c.connPool.Remove(cn)
- return nil, false, err
+ err := c.initConn(cn)
+ if err != nil {
+ c.connPool.Remove(cn)
+ return nil, err
}
}
- return cn, isNew, nil
+ return cn, nil
}
func (c *baseClient) releaseConn(cn *pool.Conn, err error) bool {
if internal.IsBadConn(err, false) {
- _ = c.connPool.Remove(cn)
+ c.connPool.Remove(cn)
return false
}
- _ = c.connPool.Put(cn)
+ c.connPool.Put(cn)
return true
}
@@ -137,7 +138,7 @@ func (c *baseClient) defaultProcess(cmd Cmder) error {
time.Sleep(c.retryBackoff(attempt))
}
- cn, _, err := c.getConn()
+ cn, err := c.getConn()
if err != nil {
cmd.setErr(err)
if internal.IsRetryableError(err, true) {
@@ -225,7 +226,7 @@ func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) e
time.Sleep(c.retryBackoff(attempt))
}
- cn, _, err := c.getConn()
+ cn, err := c.getConn()
if err != nil {
setCmdsErr(cmds, err)
return err
@@ -234,10 +235,10 @@ func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) e
canRetry, err := p(cn, cmds)
if err == nil || internal.IsRedisError(err) {
- _ = c.connPool.Put(cn)
+ c.connPool.Put(cn)
break
}
- _ = c.connPool.Remove(cn)
+ c.connPool.Remove(cn)
if !canRetry || !internal.IsRetryableError(err, true) {
break
diff --git a/vendor/github.com/go-redis/redis/result.go b/vendor/github.com/go-redis/redis/result.go
index 28cea5ca8..e086e8e34 100644
--- a/vendor/github.com/go-redis/redis/result.go
+++ b/vendor/github.com/go-redis/redis/result.go
@@ -2,7 +2,7 @@ package redis
import "time"
-// NewCmdResult returns a Cmd initalised with val and err for testing
+// NewCmdResult returns a Cmd initialised with val and err for testing
func NewCmdResult(val interface{}, err error) *Cmd {
var cmd Cmd
cmd.val = val
@@ -10,7 +10,7 @@ func NewCmdResult(val interface{}, err error) *Cmd {
return &cmd
}
-// NewSliceResult returns a SliceCmd initalised with val and err for testing
+// NewSliceResult returns a SliceCmd initialised with val and err for testing
func NewSliceResult(val []interface{}, err error) *SliceCmd {
var cmd SliceCmd
cmd.val = val
@@ -18,7 +18,7 @@ func NewSliceResult(val []interface{}, err error) *SliceCmd {
return &cmd
}
-// NewStatusResult returns a StatusCmd initalised with val and err for testing
+// NewStatusResult returns a StatusCmd initialised with val and err for testing
func NewStatusResult(val string, err error) *StatusCmd {
var cmd StatusCmd
cmd.val = val
@@ -26,7 +26,7 @@ func NewStatusResult(val string, err error) *StatusCmd {
return &cmd
}
-// NewIntResult returns an IntCmd initalised with val and err for testing
+// NewIntResult returns an IntCmd initialised with val and err for testing
func NewIntResult(val int64, err error) *IntCmd {
var cmd IntCmd
cmd.val = val
@@ -34,7 +34,7 @@ func NewIntResult(val int64, err error) *IntCmd {
return &cmd
}
-// NewDurationResult returns a DurationCmd initalised with val and err for testing
+// NewDurationResult returns a DurationCmd initialised with val and err for testing
func NewDurationResult(val time.Duration, err error) *DurationCmd {
var cmd DurationCmd
cmd.val = val
@@ -42,7 +42,7 @@ func NewDurationResult(val time.Duration, err error) *DurationCmd {
return &cmd
}
-// NewBoolResult returns a BoolCmd initalised with val and err for testing
+// NewBoolResult returns a BoolCmd initialised with val and err for testing
func NewBoolResult(val bool, err error) *BoolCmd {
var cmd BoolCmd
cmd.val = val
@@ -50,7 +50,7 @@ func NewBoolResult(val bool, err error) *BoolCmd {
return &cmd
}
-// NewStringResult returns a StringCmd initalised with val and err for testing
+// NewStringResult returns a StringCmd initialised with val and err for testing
func NewStringResult(val string, err error) *StringCmd {
var cmd StringCmd
cmd.val = []byte(val)
@@ -58,7 +58,7 @@ func NewStringResult(val string, err error) *StringCmd {
return &cmd
}
-// NewFloatResult returns a FloatCmd initalised with val and err for testing
+// NewFloatResult returns a FloatCmd initialised with val and err for testing
func NewFloatResult(val float64, err error) *FloatCmd {
var cmd FloatCmd
cmd.val = val
@@ -66,7 +66,7 @@ func NewFloatResult(val float64, err error) *FloatCmd {
return &cmd
}
-// NewStringSliceResult returns a StringSliceCmd initalised with val and err for testing
+// NewStringSliceResult returns a StringSliceCmd initialised with val and err for testing
func NewStringSliceResult(val []string, err error) *StringSliceCmd {
var cmd StringSliceCmd
cmd.val = val
@@ -74,7 +74,7 @@ func NewStringSliceResult(val []string, err error) *StringSliceCmd {
return &cmd
}
-// NewBoolSliceResult returns a BoolSliceCmd initalised with val and err for testing
+// NewBoolSliceResult returns a BoolSliceCmd initialised with val and err for testing
func NewBoolSliceResult(val []bool, err error) *BoolSliceCmd {
var cmd BoolSliceCmd
cmd.val = val
@@ -82,7 +82,7 @@ func NewBoolSliceResult(val []bool, err error) *BoolSliceCmd {
return &cmd
}
-// NewStringStringMapResult returns a StringStringMapCmd initalised with val and err for testing
+// NewStringStringMapResult returns a StringStringMapCmd initialised with val and err for testing
func NewStringStringMapResult(val map[string]string, err error) *StringStringMapCmd {
var cmd StringStringMapCmd
cmd.val = val
@@ -90,7 +90,7 @@ func NewStringStringMapResult(val map[string]string, err error) *StringStringMap
return &cmd
}
-// NewStringIntMapCmdResult returns a StringIntMapCmd initalised with val and err for testing
+// NewStringIntMapCmdResult returns a StringIntMapCmd initialised with val and err for testing
func NewStringIntMapCmdResult(val map[string]int64, err error) *StringIntMapCmd {
var cmd StringIntMapCmd
cmd.val = val
@@ -98,7 +98,7 @@ func NewStringIntMapCmdResult(val map[string]int64, err error) *StringIntMapCmd
return &cmd
}
-// NewZSliceCmdResult returns a ZSliceCmd initalised with val and err for testing
+// NewZSliceCmdResult returns a ZSliceCmd initialised with val and err for testing
func NewZSliceCmdResult(val []Z, err error) *ZSliceCmd {
var cmd ZSliceCmd
cmd.val = val
@@ -106,7 +106,7 @@ func NewZSliceCmdResult(val []Z, err error) *ZSliceCmd {
return &cmd
}
-// NewScanCmdResult returns a ScanCmd initalised with val and err for testing
+// NewScanCmdResult returns a ScanCmd initialised with val and err for testing
func NewScanCmdResult(keys []string, cursor uint64, err error) *ScanCmd {
var cmd ScanCmd
cmd.page = keys
@@ -115,7 +115,7 @@ func NewScanCmdResult(keys []string, cursor uint64, err error) *ScanCmd {
return &cmd
}
-// NewClusterSlotsCmdResult returns a ClusterSlotsCmd initalised with val and err for testing
+// NewClusterSlotsCmdResult returns a ClusterSlotsCmd initialised with val and err for testing
func NewClusterSlotsCmdResult(val []ClusterSlot, err error) *ClusterSlotsCmd {
var cmd ClusterSlotsCmd
cmd.val = val
@@ -123,7 +123,7 @@ func NewClusterSlotsCmdResult(val []ClusterSlot, err error) *ClusterSlotsCmd {
return &cmd
}
-// NewGeoLocationCmdResult returns a GeoLocationCmd initalised with val and err for testing
+// NewGeoLocationCmdResult returns a GeoLocationCmd initialised with val and err for testing
func NewGeoLocationCmdResult(val []GeoLocation, err error) *GeoLocationCmd {
var cmd GeoLocationCmd
cmd.locations = val
@@ -131,7 +131,7 @@ func NewGeoLocationCmdResult(val []GeoLocation, err error) *GeoLocationCmd {
return &cmd
}
-// NewCommandsInfoCmdResult returns a CommandsInfoCmd initalised with val and err for testing
+// NewCommandsInfoCmdResult returns a CommandsInfoCmd initialised with val and err for testing
func NewCommandsInfoCmdResult(val map[string]*CommandInfo, err error) *CommandsInfoCmd {
var cmd CommandsInfoCmd
cmd.val = val
diff --git a/vendor/github.com/go-redis/redis/ring.go b/vendor/github.com/go-redis/redis/ring.go
index 6d2877413..b47a1094e 100644
--- a/vendor/github.com/go-redis/redis/ring.go
+++ b/vendor/github.com/go-redis/redis/ring.go
@@ -304,10 +304,11 @@ func NewRing(opt *RingOptions) *Ring {
opt.init()
ring := &Ring{
- opt: opt,
- shards: newRingShards(),
- cmdsInfoCache: newCmdsInfoCache(),
+ opt: opt,
+ shards: newRingShards(),
}
+ ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
+
ring.processPipeline = ring.defaultProcessPipeline
ring.cmdable.setProcessor(ring.Process)
@@ -428,21 +429,23 @@ func (c *Ring) ForEachShard(fn func(client *Client) error) error {
}
}
-func (c *Ring) cmdInfo(name string) *CommandInfo {
- cmdsInfo, err := c.cmdsInfoCache.Do(func() (map[string]*CommandInfo, error) {
- shards := c.shards.List()
- firstErr := errRingShardsDown
- for _, shard := range shards {
- cmdsInfo, err := shard.Client.Command().Result()
- if err == nil {
- return cmdsInfo, nil
- }
- if firstErr == nil {
- firstErr = err
- }
+func (c *Ring) cmdsInfo() (map[string]*CommandInfo, error) {
+ shards := c.shards.List()
+ firstErr := errRingShardsDown
+ for _, shard := range shards {
+ cmdsInfo, err := shard.Client.Command().Result()
+ if err == nil {
+ return cmdsInfo, nil
}
- return nil, firstErr
- })
+ if firstErr == nil {
+ firstErr = err
+ }
+ }
+ return nil, firstErr
+}
+
+func (c *Ring) cmdInfo(name string) *CommandInfo {
+ cmdsInfo, err := c.cmdsInfoCache.Get()
if err != nil {
return nil
}
@@ -522,7 +525,7 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error {
continue
}
- cn, _, err := shard.Client.getConn()
+ cn, err := shard.Client.getConn()
if err != nil {
setCmdsErr(cmds, err)
continue
@@ -530,10 +533,10 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error {
canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
if err == nil || internal.IsRedisError(err) {
- _ = shard.Client.connPool.Put(cn)
+ shard.Client.connPool.Put(cn)
continue
}
- _ = shard.Client.connPool.Remove(cn)
+ shard.Client.connPool.Remove(cn)
if canRetry && internal.IsRetryableError(err, true) {
if failedCmdsMap == nil {
diff --git a/vendor/github.com/go-redis/redis/sentinel.go b/vendor/github.com/go-redis/redis/sentinel.go
index 3f56f08b3..3cedf36ee 100644
--- a/vendor/github.com/go-redis/redis/sentinel.go
+++ b/vendor/github.com/go-redis/redis/sentinel.go
@@ -1,6 +1,7 @@
package redis
import (
+ "crypto/tls"
"errors"
"net"
"strings"
@@ -38,6 +39,8 @@ type FailoverOptions struct {
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
+
+ TLSConfig *tls.Config
}
func (opt *FailoverOptions) options() *Options {
@@ -59,6 +62,8 @@ func (opt *FailoverOptions) options() *Options {
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
+
+ TLSConfig: opt.TLSConfig,
}
}
@@ -94,25 +99,23 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
//------------------------------------------------------------------------------
-type sentinelClient struct {
- cmdable
+type SentinelClient struct {
baseClient
}
-func newSentinel(opt *Options) *sentinelClient {
+func NewSentinelClient(opt *Options) *SentinelClient {
opt.init()
- c := sentinelClient{
+ c := &SentinelClient{
baseClient: baseClient{
opt: opt,
connPool: newConnPool(opt),
},
}
c.baseClient.init()
- c.cmdable.setProcessor(c.Process)
- return &c
+ return c
}
-func (c *sentinelClient) PubSub() *PubSub {
+func (c *SentinelClient) PubSub() *PubSub {
return &PubSub{
opt: c.opt,
@@ -123,13 +126,13 @@ func (c *sentinelClient) PubSub() *PubSub {
}
}
-func (c *sentinelClient) GetMasterAddrByName(name string) *StringSliceCmd {
+func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd {
cmd := NewStringSliceCmd("SENTINEL", "get-master-addr-by-name", name)
c.Process(cmd)
return cmd
}
-func (c *sentinelClient) Sentinels(name string) *SliceCmd {
+func (c *SentinelClient) Sentinels(name string) *SliceCmd {
cmd := NewSliceCmd("SENTINEL", "sentinels", name)
c.Process(cmd)
return cmd
@@ -146,7 +149,7 @@ type sentinelFailover struct {
mu sync.RWMutex
masterName string
_masterAddr string
- sentinel *sentinelClient
+ sentinel *SentinelClient
}
func (d *sentinelFailover) Close() error {
@@ -200,7 +203,7 @@ func (d *sentinelFailover) masterAddr() (string, error) {
}
for i, sentinelAddr := range d.sentinelAddrs {
- sentinel := newSentinel(&Options{
+ sentinel := NewSentinelClient(&Options{
Addr: sentinelAddr,
DialTimeout: d.opt.DialTimeout,
@@ -214,7 +217,8 @@ func (d *sentinelFailover) masterAddr() (string, error) {
masterAddr, err := sentinel.GetMasterAddrByName(d.masterName).Result()
if err != nil {
- internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s", d.masterName, err)
+ internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s",
+ d.masterName, err)
sentinel.Close()
continue
}
@@ -241,7 +245,7 @@ func (d *sentinelFailover) switchMaster(masterAddr string) {
d._masterAddr = masterAddr
}
-func (d *sentinelFailover) setSentinel(sentinel *sentinelClient) {
+func (d *sentinelFailover) setSentinel(sentinel *SentinelClient) {
d.discoverSentinels(sentinel)
d.sentinel = sentinel
go d.listen(sentinel)
@@ -263,7 +267,7 @@ func (d *sentinelFailover) _resetSentinel() error {
return err
}
-func (d *sentinelFailover) discoverSentinels(sentinel *sentinelClient) {
+func (d *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
sentinels, err := sentinel.Sentinels(d.masterName).Result()
if err != nil {
internal.Logf("sentinel: Sentinels master=%q failed: %s", d.masterName, err)
@@ -287,7 +291,7 @@ func (d *sentinelFailover) discoverSentinels(sentinel *sentinelClient) {
}
}
-func (d *sentinelFailover) listen(sentinel *sentinelClient) {
+func (d *sentinelFailover) listen(sentinel *SentinelClient) {
var pubsub *PubSub
for {
if pubsub == nil {
diff --git a/vendor/github.com/go-redis/redis/universal.go b/vendor/github.com/go-redis/redis/universal.go
index fde3c4150..9e30c81d9 100644
--- a/vendor/github.com/go-redis/redis/universal.go
+++ b/vendor/github.com/go-redis/redis/universal.go
@@ -1,6 +1,9 @@
package redis
-import "time"
+import (
+ "crypto/tls"
+ "time"
+)
// UniversalOptions information is required by UniversalClient to establish
// connections.
@@ -27,6 +30,7 @@ type UniversalOptions struct {
// Common options
+ OnConnect func(*Conn) error
MaxRetries int
Password string
DialTimeout time.Duration
@@ -36,6 +40,7 @@ type UniversalOptions struct {
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
+ TLSConfig *tls.Config
}
func (o *UniversalOptions) cluster() *ClusterOptions {
@@ -49,6 +54,7 @@ func (o *UniversalOptions) cluster() *ClusterOptions {
RouteByLatency: o.RouteByLatency,
ReadOnly: o.ReadOnly,
+ OnConnect: o.OnConnect,
MaxRetries: o.MaxRetries,
Password: o.Password,
DialTimeout: o.DialTimeout,
@@ -58,6 +64,7 @@ func (o *UniversalOptions) cluster() *ClusterOptions {
PoolTimeout: o.PoolTimeout,
IdleTimeout: o.IdleTimeout,
IdleCheckFrequency: o.IdleCheckFrequency,
+ TLSConfig: o.TLSConfig,
}
}
@@ -71,6 +78,7 @@ func (o *UniversalOptions) failover() *FailoverOptions {
MasterName: o.MasterName,
DB: o.DB,
+ OnConnect: o.OnConnect,
MaxRetries: o.MaxRetries,
Password: o.Password,
DialTimeout: o.DialTimeout,
@@ -80,6 +88,7 @@ func (o *UniversalOptions) failover() *FailoverOptions {
PoolTimeout: o.PoolTimeout,
IdleTimeout: o.IdleTimeout,
IdleCheckFrequency: o.IdleCheckFrequency,
+ TLSConfig: o.TLSConfig,
}
}
@@ -93,6 +102,7 @@ func (o *UniversalOptions) simple() *Options {
Addr: addr,
DB: o.DB,
+ OnConnect: o.OnConnect,
MaxRetries: o.MaxRetries,
Password: o.Password,
DialTimeout: o.DialTimeout,
@@ -102,6 +112,7 @@ func (o *UniversalOptions) simple() *Options {
PoolTimeout: o.PoolTimeout,
IdleTimeout: o.IdleTimeout,
IdleCheckFrequency: o.IdleCheckFrequency,
+ TLSConfig: o.TLSConfig,
}
}
@@ -113,6 +124,7 @@ func (o *UniversalOptions) simple() *Options {
// applications locally.
type UniversalClient interface {
Cmdable
+ Watch(fn func(*Tx) error, keys ...string) error
Process(cmd Cmder) error
WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error)
Subscribe(channels ...string) *PubSub