summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis/internal/pool/pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/go-redis/redis/internal/pool/pool.go')
-rw-r--r--vendor/github.com/go-redis/redis/internal/pool/pool.go205
1 files changed, 122 insertions, 83 deletions
diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool.go b/vendor/github.com/go-redis/redis/internal/pool/pool.go
index ae81905ea..cab66904a 100644
--- a/vendor/github.com/go-redis/redis/internal/pool/pool.go
+++ b/vendor/github.com/go-redis/redis/internal/pool/pool.go
@@ -28,7 +28,8 @@ type Stats struct {
Timeouts uint32 // number of times a wait timeout occurred
TotalConns uint32 // number of total connections in the pool
- FreeConns uint32 // number of free connections in the pool
+ FreeConns uint32 // deprecated - use IdleConns
+ IdleConns uint32 // number of idle connections in the pool
StaleConns uint32 // number of stale connections removed from the pool
}
@@ -36,12 +37,12 @@ type Pooler interface {
NewConn() (*Conn, error)
CloseConn(*Conn) error
- Get() (*Conn, bool, error)
- Put(*Conn) error
- Remove(*Conn) error
+ Get() (*Conn, error)
+ Put(*Conn)
+ Remove(*Conn)
Len() int
- FreeLen() int
+ IdleLen() int
Stats() *Stats
Close() error
@@ -70,8 +71,8 @@ type ConnPool struct {
connsMu sync.Mutex
conns []*Conn
- freeConnsMu sync.Mutex
- freeConns []*Conn
+ idleConnsMu sync.RWMutex
+ idleConns []*Conn
stats Stats
@@ -86,15 +87,29 @@ func NewConnPool(opt *Options) *ConnPool {
queue: make(chan struct{}, opt.PoolSize),
conns: make([]*Conn, 0, opt.PoolSize),
- freeConns: make([]*Conn, 0, opt.PoolSize),
+ idleConns: make([]*Conn, 0, opt.PoolSize),
}
+
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
go p.reaper(opt.IdleCheckFrequency)
}
+
return p
}
func (p *ConnPool) NewConn() (*Conn, error) {
+ cn, err := p.newConn()
+ if err != nil {
+ return nil, err
+ }
+
+ p.connsMu.Lock()
+ p.conns = append(p.conns, cn)
+ p.connsMu.Unlock()
+ return cn, nil
+}
+
+func (p *ConnPool) newConn() (*Conn, error) {
if p.closed() {
return nil, ErrClosed
}
@@ -112,12 +127,7 @@ func (p *ConnPool) NewConn() (*Conn, error) {
return nil, err
}
- cn := NewConn(netConn)
- p.connsMu.Lock()
- p.conns = append(p.conns, cn)
- p.connsMu.Unlock()
-
- return cn, nil
+ return NewConn(netConn), nil
}
func (p *ConnPool) tryDial() {
@@ -153,34 +163,20 @@ func (p *ConnPool) getLastDialError() error {
}
// Get returns existed connection from the pool or creates a new one.
-func (p *ConnPool) Get() (*Conn, bool, error) {
+func (p *ConnPool) Get() (*Conn, error) {
if p.closed() {
- return nil, false, ErrClosed
+ return nil, ErrClosed
}
- select {
- case p.queue <- struct{}{}:
- default:
- timer := timers.Get().(*time.Timer)
- timer.Reset(p.opt.PoolTimeout)
-
- select {
- case p.queue <- struct{}{}:
- if !timer.Stop() {
- <-timer.C
- }
- timers.Put(timer)
- case <-timer.C:
- timers.Put(timer)
- atomic.AddUint32(&p.stats.Timeouts, 1)
- return nil, false, ErrPoolTimeout
- }
+ err := p.waitTurn()
+ if err != nil {
+ return nil, err
}
for {
- p.freeConnsMu.Lock()
- cn := p.popFree()
- p.freeConnsMu.Unlock()
+ p.idleConnsMu.Lock()
+ cn := p.popIdle()
+ p.idleConnsMu.Unlock()
if cn == nil {
break
@@ -192,50 +188,89 @@ func (p *ConnPool) Get() (*Conn, bool, error) {
}
atomic.AddUint32(&p.stats.Hits, 1)
- return cn, false, nil
+ return cn, nil
}
atomic.AddUint32(&p.stats.Misses, 1)
newcn, err := p.NewConn()
if err != nil {
- <-p.queue
- return nil, false, err
+ p.freeTurn()
+ return nil, err
}
- return newcn, true, nil
+ return newcn, nil
+}
+
+func (p *ConnPool) getTurn() {
+ p.queue <- struct{}{}
}
-func (p *ConnPool) popFree() *Conn {
- if len(p.freeConns) == 0 {
+func (p *ConnPool) waitTurn() error {
+ select {
+ case p.queue <- struct{}{}:
+ return nil
+ default:
+ timer := timers.Get().(*time.Timer)
+ timer.Reset(p.opt.PoolTimeout)
+
+ select {
+ case p.queue <- struct{}{}:
+ if !timer.Stop() {
+ <-timer.C
+ }
+ timers.Put(timer)
+ return nil
+ case <-timer.C:
+ timers.Put(timer)
+ atomic.AddUint32(&p.stats.Timeouts, 1)
+ return ErrPoolTimeout
+ }
+ }
+}
+
+func (p *ConnPool) freeTurn() {
+ <-p.queue
+}
+
+func (p *ConnPool) popIdle() *Conn {
+ if len(p.idleConns) == 0 {
return nil
}
- idx := len(p.freeConns) - 1
- cn := p.freeConns[idx]
- p.freeConns = p.freeConns[:idx]
+ idx := len(p.idleConns) - 1
+ cn := p.idleConns[idx]
+ p.idleConns = p.idleConns[:idx]
+
return cn
}
-func (p *ConnPool) Put(cn *Conn) error {
- if data := cn.Rd.PeekBuffered(); data != nil {
- internal.Logf("connection has unread data: %q", data)
- return p.Remove(cn)
+func (p *ConnPool) Put(cn *Conn) {
+ buf := cn.Rd.PeekBuffered()
+ if buf != nil {
+ internal.Logf("connection has unread data: %.100q", buf)
+ p.Remove(cn)
+ return
}
- p.freeConnsMu.Lock()
- p.freeConns = append(p.freeConns, cn)
- p.freeConnsMu.Unlock()
- <-p.queue
- return nil
+
+ p.idleConnsMu.Lock()
+ p.idleConns = append(p.idleConns, cn)
+ p.idleConnsMu.Unlock()
+ p.freeTurn()
}
-func (p *ConnPool) Remove(cn *Conn) error {
- _ = p.CloseConn(cn)
- <-p.queue
- return nil
+func (p *ConnPool) Remove(cn *Conn) {
+ p.removeConn(cn)
+ p.freeTurn()
+ _ = p.closeConn(cn)
}
func (p *ConnPool) CloseConn(cn *Conn) error {
+ p.removeConn(cn)
+ return p.closeConn(cn)
+}
+
+func (p *ConnPool) removeConn(cn *Conn) {
p.connsMu.Lock()
for i, c := range p.conns {
if c == cn {
@@ -244,8 +279,6 @@ func (p *ConnPool) CloseConn(cn *Conn) error {
}
}
p.connsMu.Unlock()
-
- return p.closeConn(cn)
}
func (p *ConnPool) closeConn(cn *Conn) error {
@@ -263,22 +296,24 @@ func (p *ConnPool) Len() int {
return l
}
-// FreeLen returns number of free connections.
-func (p *ConnPool) FreeLen() int {
- p.freeConnsMu.Lock()
- l := len(p.freeConns)
- p.freeConnsMu.Unlock()
+// FreeLen returns number of idle connections.
+func (p *ConnPool) IdleLen() int {
+ p.idleConnsMu.RLock()
+ l := len(p.idleConns)
+ p.idleConnsMu.RUnlock()
return l
}
func (p *ConnPool) Stats() *Stats {
+ idleLen := p.IdleLen()
return &Stats{
Hits: atomic.LoadUint32(&p.stats.Hits),
Misses: atomic.LoadUint32(&p.stats.Misses),
Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
TotalConns: uint32(p.Len()),
- FreeConns: uint32(p.FreeLen()),
+ FreeConns: uint32(idleLen),
+ IdleConns: uint32(idleLen),
StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
}
}
@@ -316,41 +351,45 @@ func (p *ConnPool) Close() error {
p.conns = nil
p.connsMu.Unlock()
- p.freeConnsMu.Lock()
- p.freeConns = nil
- p.freeConnsMu.Unlock()
+ p.idleConnsMu.Lock()
+ p.idleConns = nil
+ p.idleConnsMu.Unlock()
return firstErr
}
-func (p *ConnPool) reapStaleConn() bool {
- if len(p.freeConns) == 0 {
- return false
+func (p *ConnPool) reapStaleConn() *Conn {
+ if len(p.idleConns) == 0 {
+ return nil
}
- cn := p.freeConns[0]
+ cn := p.idleConns[0]
if !cn.IsStale(p.opt.IdleTimeout) {
- return false
+ return nil
}
- p.CloseConn(cn)
- p.freeConns = append(p.freeConns[:0], p.freeConns[1:]...)
+ p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
- return true
+ return cn
}
func (p *ConnPool) ReapStaleConns() (int, error) {
var n int
for {
- p.queue <- struct{}{}
- p.freeConnsMu.Lock()
+ p.getTurn()
- reaped := p.reapStaleConn()
+ p.idleConnsMu.Lock()
+ cn := p.reapStaleConn()
+ p.idleConnsMu.Unlock()
+
+ if cn != nil {
+ p.removeConn(cn)
+ }
- p.freeConnsMu.Unlock()
- <-p.queue
+ p.freeTurn()
- if reaped {
+ if cn != nil {
+ p.closeConn(cn)
n++
} else {
break