summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis/internal
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/go-redis/redis/internal')
-rw-r--r--vendor/github.com/go-redis/redis/internal/pool/pool.go205
-rw-r--r--vendor/github.com/go-redis/redis/internal/pool/pool_single.go12
-rw-r--r--vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go54
3 files changed, 147 insertions, 124 deletions
diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool.go b/vendor/github.com/go-redis/redis/internal/pool/pool.go
index ae81905ea..cab66904a 100644
--- a/vendor/github.com/go-redis/redis/internal/pool/pool.go
+++ b/vendor/github.com/go-redis/redis/internal/pool/pool.go
@@ -28,7 +28,8 @@ type Stats struct {
Timeouts uint32 // number of times a wait timeout occurred
TotalConns uint32 // number of total connections in the pool
- FreeConns uint32 // number of free connections in the pool
+ FreeConns uint32 // deprecated - use IdleConns
+ IdleConns uint32 // number of idle connections in the pool
StaleConns uint32 // number of stale connections removed from the pool
}
@@ -36,12 +37,12 @@ type Pooler interface {
NewConn() (*Conn, error)
CloseConn(*Conn) error
- Get() (*Conn, bool, error)
- Put(*Conn) error
- Remove(*Conn) error
+ Get() (*Conn, error)
+ Put(*Conn)
+ Remove(*Conn)
Len() int
- FreeLen() int
+ IdleLen() int
Stats() *Stats
Close() error
@@ -70,8 +71,8 @@ type ConnPool struct {
connsMu sync.Mutex
conns []*Conn
- freeConnsMu sync.Mutex
- freeConns []*Conn
+ idleConnsMu sync.RWMutex
+ idleConns []*Conn
stats Stats
@@ -86,15 +87,29 @@ func NewConnPool(opt *Options) *ConnPool {
queue: make(chan struct{}, opt.PoolSize),
conns: make([]*Conn, 0, opt.PoolSize),
- freeConns: make([]*Conn, 0, opt.PoolSize),
+ idleConns: make([]*Conn, 0, opt.PoolSize),
}
+
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
go p.reaper(opt.IdleCheckFrequency)
}
+
return p
}
func (p *ConnPool) NewConn() (*Conn, error) {
+ cn, err := p.newConn()
+ if err != nil {
+ return nil, err
+ }
+
+ p.connsMu.Lock()
+ p.conns = append(p.conns, cn)
+ p.connsMu.Unlock()
+ return cn, nil
+}
+
+func (p *ConnPool) newConn() (*Conn, error) {
if p.closed() {
return nil, ErrClosed
}
@@ -112,12 +127,7 @@ func (p *ConnPool) NewConn() (*Conn, error) {
return nil, err
}
- cn := NewConn(netConn)
- p.connsMu.Lock()
- p.conns = append(p.conns, cn)
- p.connsMu.Unlock()
-
- return cn, nil
+ return NewConn(netConn), nil
}
func (p *ConnPool) tryDial() {
@@ -153,34 +163,20 @@ func (p *ConnPool) getLastDialError() error {
}
// Get returns existed connection from the pool or creates a new one.
-func (p *ConnPool) Get() (*Conn, bool, error) {
+func (p *ConnPool) Get() (*Conn, error) {
if p.closed() {
- return nil, false, ErrClosed
+ return nil, ErrClosed
}
- select {
- case p.queue <- struct{}{}:
- default:
- timer := timers.Get().(*time.Timer)
- timer.Reset(p.opt.PoolTimeout)
-
- select {
- case p.queue <- struct{}{}:
- if !timer.Stop() {
- <-timer.C
- }
- timers.Put(timer)
- case <-timer.C:
- timers.Put(timer)
- atomic.AddUint32(&p.stats.Timeouts, 1)
- return nil, false, ErrPoolTimeout
- }
+ err := p.waitTurn()
+ if err != nil {
+ return nil, err
}
for {
- p.freeConnsMu.Lock()
- cn := p.popFree()
- p.freeConnsMu.Unlock()
+ p.idleConnsMu.Lock()
+ cn := p.popIdle()
+ p.idleConnsMu.Unlock()
if cn == nil {
break
@@ -192,50 +188,89 @@ func (p *ConnPool) Get() (*Conn, bool, error) {
}
atomic.AddUint32(&p.stats.Hits, 1)
- return cn, false, nil
+ return cn, nil
}
atomic.AddUint32(&p.stats.Misses, 1)
newcn, err := p.NewConn()
if err != nil {
- <-p.queue
- return nil, false, err
+ p.freeTurn()
+ return nil, err
}
- return newcn, true, nil
+ return newcn, nil
+}
+
+func (p *ConnPool) getTurn() {
+ p.queue <- struct{}{}
}
-func (p *ConnPool) popFree() *Conn {
- if len(p.freeConns) == 0 {
+func (p *ConnPool) waitTurn() error {
+ select {
+ case p.queue <- struct{}{}:
+ return nil
+ default:
+ timer := timers.Get().(*time.Timer)
+ timer.Reset(p.opt.PoolTimeout)
+
+ select {
+ case p.queue <- struct{}{}:
+ if !timer.Stop() {
+ <-timer.C
+ }
+ timers.Put(timer)
+ return nil
+ case <-timer.C:
+ timers.Put(timer)
+ atomic.AddUint32(&p.stats.Timeouts, 1)
+ return ErrPoolTimeout
+ }
+ }
+}
+
+func (p *ConnPool) freeTurn() {
+ <-p.queue
+}
+
+func (p *ConnPool) popIdle() *Conn {
+ if len(p.idleConns) == 0 {
return nil
}
- idx := len(p.freeConns) - 1
- cn := p.freeConns[idx]
- p.freeConns = p.freeConns[:idx]
+ idx := len(p.idleConns) - 1
+ cn := p.idleConns[idx]
+ p.idleConns = p.idleConns[:idx]
+
return cn
}
-func (p *ConnPool) Put(cn *Conn) error {
- if data := cn.Rd.PeekBuffered(); data != nil {
- internal.Logf("connection has unread data: %q", data)
- return p.Remove(cn)
+func (p *ConnPool) Put(cn *Conn) {
+ buf := cn.Rd.PeekBuffered()
+ if buf != nil {
+ internal.Logf("connection has unread data: %.100q", buf)
+ p.Remove(cn)
+ return
}
- p.freeConnsMu.Lock()
- p.freeConns = append(p.freeConns, cn)
- p.freeConnsMu.Unlock()
- <-p.queue
- return nil
+
+ p.idleConnsMu.Lock()
+ p.idleConns = append(p.idleConns, cn)
+ p.idleConnsMu.Unlock()
+ p.freeTurn()
}
-func (p *ConnPool) Remove(cn *Conn) error {
- _ = p.CloseConn(cn)
- <-p.queue
- return nil
+func (p *ConnPool) Remove(cn *Conn) {
+ p.removeConn(cn)
+ p.freeTurn()
+ _ = p.closeConn(cn)
}
func (p *ConnPool) CloseConn(cn *Conn) error {
+ p.removeConn(cn)
+ return p.closeConn(cn)
+}
+
+func (p *ConnPool) removeConn(cn *Conn) {
p.connsMu.Lock()
for i, c := range p.conns {
if c == cn {
@@ -244,8 +279,6 @@ func (p *ConnPool) CloseConn(cn *Conn) error {
}
}
p.connsMu.Unlock()
-
- return p.closeConn(cn)
}
func (p *ConnPool) closeConn(cn *Conn) error {
@@ -263,22 +296,24 @@ func (p *ConnPool) Len() int {
return l
}
-// FreeLen returns number of free connections.
-func (p *ConnPool) FreeLen() int {
- p.freeConnsMu.Lock()
- l := len(p.freeConns)
- p.freeConnsMu.Unlock()
+// FreeLen returns number of idle connections.
+func (p *ConnPool) IdleLen() int {
+ p.idleConnsMu.RLock()
+ l := len(p.idleConns)
+ p.idleConnsMu.RUnlock()
return l
}
func (p *ConnPool) Stats() *Stats {
+ idleLen := p.IdleLen()
return &Stats{
Hits: atomic.LoadUint32(&p.stats.Hits),
Misses: atomic.LoadUint32(&p.stats.Misses),
Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
TotalConns: uint32(p.Len()),
- FreeConns: uint32(p.FreeLen()),
+ FreeConns: uint32(idleLen),
+ IdleConns: uint32(idleLen),
StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
}
}
@@ -316,41 +351,45 @@ func (p *ConnPool) Close() error {
p.conns = nil
p.connsMu.Unlock()
- p.freeConnsMu.Lock()
- p.freeConns = nil
- p.freeConnsMu.Unlock()
+ p.idleConnsMu.Lock()
+ p.idleConns = nil
+ p.idleConnsMu.Unlock()
return firstErr
}
-func (p *ConnPool) reapStaleConn() bool {
- if len(p.freeConns) == 0 {
- return false
+func (p *ConnPool) reapStaleConn() *Conn {
+ if len(p.idleConns) == 0 {
+ return nil
}
- cn := p.freeConns[0]
+ cn := p.idleConns[0]
if !cn.IsStale(p.opt.IdleTimeout) {
- return false
+ return nil
}
- p.CloseConn(cn)
- p.freeConns = append(p.freeConns[:0], p.freeConns[1:]...)
+ p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
- return true
+ return cn
}
func (p *ConnPool) ReapStaleConns() (int, error) {
var n int
for {
- p.queue <- struct{}{}
- p.freeConnsMu.Lock()
+ p.getTurn()
- reaped := p.reapStaleConn()
+ p.idleConnsMu.Lock()
+ cn := p.reapStaleConn()
+ p.idleConnsMu.Unlock()
+
+ if cn != nil {
+ p.removeConn(cn)
+ }
- p.freeConnsMu.Unlock()
- <-p.queue
+ p.freeTurn()
- if reaped {
+ if cn != nil {
+ p.closeConn(cn)
n++
} else {
break
diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool_single.go b/vendor/github.com/go-redis/redis/internal/pool/pool_single.go
index ff91279b3..b35b78afb 100644
--- a/vendor/github.com/go-redis/redis/internal/pool/pool_single.go
+++ b/vendor/github.com/go-redis/redis/internal/pool/pool_single.go
@@ -20,29 +20,27 @@ func (p *SingleConnPool) CloseConn(*Conn) error {
panic("not implemented")
}
-func (p *SingleConnPool) Get() (*Conn, bool, error) {
- return p.cn, false, nil
+func (p *SingleConnPool) Get() (*Conn, error) {
+ return p.cn, nil
}
-func (p *SingleConnPool) Put(cn *Conn) error {
+func (p *SingleConnPool) Put(cn *Conn) {
if p.cn != cn {
panic("p.cn != cn")
}
- return nil
}
-func (p *SingleConnPool) Remove(cn *Conn) error {
+func (p *SingleConnPool) Remove(cn *Conn) {
if p.cn != cn {
panic("p.cn != cn")
}
- return nil
}
func (p *SingleConnPool) Len() int {
return 1
}
-func (p *SingleConnPool) FreeLen() int {
+func (p *SingleConnPool) IdleLen() int {
return 0
}
diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go b/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go
index 17f163858..91bd91333 100644
--- a/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go
+++ b/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go
@@ -28,55 +28,40 @@ func (p *StickyConnPool) CloseConn(*Conn) error {
panic("not implemented")
}
-func (p *StickyConnPool) Get() (*Conn, bool, error) {
+func (p *StickyConnPool) Get() (*Conn, error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
- return nil, false, ErrClosed
+ return nil, ErrClosed
}
if p.cn != nil {
- return p.cn, false, nil
+ return p.cn, nil
}
- cn, _, err := p.pool.Get()
+ cn, err := p.pool.Get()
if err != nil {
- return nil, false, err
+ return nil, err
}
+
p.cn = cn
- return cn, true, nil
+ return cn, nil
}
-func (p *StickyConnPool) putUpstream() (err error) {
- err = p.pool.Put(p.cn)
+func (p *StickyConnPool) putUpstream() {
+ p.pool.Put(p.cn)
p.cn = nil
- return err
}
-func (p *StickyConnPool) Put(cn *Conn) error {
- p.mu.Lock()
- defer p.mu.Unlock()
-
- if p.closed {
- return ErrClosed
- }
- return nil
-}
+func (p *StickyConnPool) Put(cn *Conn) {}
-func (p *StickyConnPool) removeUpstream() error {
- err := p.pool.Remove(p.cn)
+func (p *StickyConnPool) removeUpstream() {
+ p.pool.Remove(p.cn)
p.cn = nil
- return err
}
-func (p *StickyConnPool) Remove(cn *Conn) error {
- p.mu.Lock()
- defer p.mu.Unlock()
-
- if p.closed {
- return nil
- }
- return p.removeUpstream()
+func (p *StickyConnPool) Remove(cn *Conn) {
+ p.removeUpstream()
}
func (p *StickyConnPool) Len() int {
@@ -89,7 +74,7 @@ func (p *StickyConnPool) Len() int {
return 1
}
-func (p *StickyConnPool) FreeLen() int {
+func (p *StickyConnPool) IdleLen() int {
p.mu.Lock()
defer p.mu.Unlock()
@@ -111,13 +96,14 @@ func (p *StickyConnPool) Close() error {
return ErrClosed
}
p.closed = true
- var err error
+
if p.cn != nil {
if p.reusable {
- err = p.putUpstream()
+ p.putUpstream()
} else {
- err = p.removeUpstream()
+ p.removeUpstream()
}
}
- return err
+
+ return nil
}