diff options
Diffstat (limited to 'vendor/github.com/go-redis/redis/internal/pool')
3 files changed, 147 insertions, 124 deletions
diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool.go b/vendor/github.com/go-redis/redis/internal/pool/pool.go index ae81905ea..cab66904a 100644 --- a/vendor/github.com/go-redis/redis/internal/pool/pool.go +++ b/vendor/github.com/go-redis/redis/internal/pool/pool.go @@ -28,7 +28,8 @@ type Stats struct { Timeouts uint32 // number of times a wait timeout occurred TotalConns uint32 // number of total connections in the pool - FreeConns uint32 // number of free connections in the pool + FreeConns uint32 // deprecated - use IdleConns + IdleConns uint32 // number of idle connections in the pool StaleConns uint32 // number of stale connections removed from the pool } @@ -36,12 +37,12 @@ type Pooler interface { NewConn() (*Conn, error) CloseConn(*Conn) error - Get() (*Conn, bool, error) - Put(*Conn) error - Remove(*Conn) error + Get() (*Conn, error) + Put(*Conn) + Remove(*Conn) Len() int - FreeLen() int + IdleLen() int Stats() *Stats Close() error @@ -70,8 +71,8 @@ type ConnPool struct { connsMu sync.Mutex conns []*Conn - freeConnsMu sync.Mutex - freeConns []*Conn + idleConnsMu sync.RWMutex + idleConns []*Conn stats Stats @@ -86,15 +87,29 @@ func NewConnPool(opt *Options) *ConnPool { queue: make(chan struct{}, opt.PoolSize), conns: make([]*Conn, 0, opt.PoolSize), - freeConns: make([]*Conn, 0, opt.PoolSize), + idleConns: make([]*Conn, 0, opt.PoolSize), } + if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 { go p.reaper(opt.IdleCheckFrequency) } + return p } func (p *ConnPool) NewConn() (*Conn, error) { + cn, err := p.newConn() + if err != nil { + return nil, err + } + + p.connsMu.Lock() + p.conns = append(p.conns, cn) + p.connsMu.Unlock() + return cn, nil +} + +func (p *ConnPool) newConn() (*Conn, error) { if p.closed() { return nil, ErrClosed } @@ -112,12 +127,7 @@ func (p *ConnPool) NewConn() (*Conn, error) { return nil, err } - cn := NewConn(netConn) - p.connsMu.Lock() - p.conns = append(p.conns, cn) - p.connsMu.Unlock() - - return cn, nil + return NewConn(netConn), nil } func (p *ConnPool) tryDial() { @@ -153,34 +163,20 @@ func (p *ConnPool) getLastDialError() error { } // Get returns existed connection from the pool or creates a new one. -func (p *ConnPool) Get() (*Conn, bool, error) { +func (p *ConnPool) Get() (*Conn, error) { if p.closed() { - return nil, false, ErrClosed + return nil, ErrClosed } - select { - case p.queue <- struct{}{}: - default: - timer := timers.Get().(*time.Timer) - timer.Reset(p.opt.PoolTimeout) - - select { - case p.queue <- struct{}{}: - if !timer.Stop() { - <-timer.C - } - timers.Put(timer) - case <-timer.C: - timers.Put(timer) - atomic.AddUint32(&p.stats.Timeouts, 1) - return nil, false, ErrPoolTimeout - } + err := p.waitTurn() + if err != nil { + return nil, err } for { - p.freeConnsMu.Lock() - cn := p.popFree() - p.freeConnsMu.Unlock() + p.idleConnsMu.Lock() + cn := p.popIdle() + p.idleConnsMu.Unlock() if cn == nil { break @@ -192,50 +188,89 @@ func (p *ConnPool) Get() (*Conn, bool, error) { } atomic.AddUint32(&p.stats.Hits, 1) - return cn, false, nil + return cn, nil } atomic.AddUint32(&p.stats.Misses, 1) newcn, err := p.NewConn() if err != nil { - <-p.queue - return nil, false, err + p.freeTurn() + return nil, err } - return newcn, true, nil + return newcn, nil +} + +func (p *ConnPool) getTurn() { + p.queue <- struct{}{} } -func (p *ConnPool) popFree() *Conn { - if len(p.freeConns) == 0 { +func (p *ConnPool) waitTurn() error { + select { + case p.queue <- struct{}{}: + return nil + default: + timer := timers.Get().(*time.Timer) + timer.Reset(p.opt.PoolTimeout) + + select { + case p.queue <- struct{}{}: + if !timer.Stop() { + <-timer.C + } + timers.Put(timer) + return nil + case <-timer.C: + timers.Put(timer) + atomic.AddUint32(&p.stats.Timeouts, 1) + return ErrPoolTimeout + } + } +} + +func (p *ConnPool) freeTurn() { + <-p.queue +} + +func (p *ConnPool) popIdle() *Conn { + if len(p.idleConns) == 0 { return nil } - idx := len(p.freeConns) - 1 - cn := p.freeConns[idx] - p.freeConns = p.freeConns[:idx] + idx := len(p.idleConns) - 1 + cn := p.idleConns[idx] + p.idleConns = p.idleConns[:idx] + return cn } -func (p *ConnPool) Put(cn *Conn) error { - if data := cn.Rd.PeekBuffered(); data != nil { - internal.Logf("connection has unread data: %q", data) - return p.Remove(cn) +func (p *ConnPool) Put(cn *Conn) { + buf := cn.Rd.PeekBuffered() + if buf != nil { + internal.Logf("connection has unread data: %.100q", buf) + p.Remove(cn) + return } - p.freeConnsMu.Lock() - p.freeConns = append(p.freeConns, cn) - p.freeConnsMu.Unlock() - <-p.queue - return nil + + p.idleConnsMu.Lock() + p.idleConns = append(p.idleConns, cn) + p.idleConnsMu.Unlock() + p.freeTurn() } -func (p *ConnPool) Remove(cn *Conn) error { - _ = p.CloseConn(cn) - <-p.queue - return nil +func (p *ConnPool) Remove(cn *Conn) { + p.removeConn(cn) + p.freeTurn() + _ = p.closeConn(cn) } func (p *ConnPool) CloseConn(cn *Conn) error { + p.removeConn(cn) + return p.closeConn(cn) +} + +func (p *ConnPool) removeConn(cn *Conn) { p.connsMu.Lock() for i, c := range p.conns { if c == cn { @@ -244,8 +279,6 @@ func (p *ConnPool) CloseConn(cn *Conn) error { } } p.connsMu.Unlock() - - return p.closeConn(cn) } func (p *ConnPool) closeConn(cn *Conn) error { @@ -263,22 +296,24 @@ func (p *ConnPool) Len() int { return l } -// FreeLen returns number of free connections. -func (p *ConnPool) FreeLen() int { - p.freeConnsMu.Lock() - l := len(p.freeConns) - p.freeConnsMu.Unlock() +// FreeLen returns number of idle connections. +func (p *ConnPool) IdleLen() int { + p.idleConnsMu.RLock() + l := len(p.idleConns) + p.idleConnsMu.RUnlock() return l } func (p *ConnPool) Stats() *Stats { + idleLen := p.IdleLen() return &Stats{ Hits: atomic.LoadUint32(&p.stats.Hits), Misses: atomic.LoadUint32(&p.stats.Misses), Timeouts: atomic.LoadUint32(&p.stats.Timeouts), TotalConns: uint32(p.Len()), - FreeConns: uint32(p.FreeLen()), + FreeConns: uint32(idleLen), + IdleConns: uint32(idleLen), StaleConns: atomic.LoadUint32(&p.stats.StaleConns), } } @@ -316,41 +351,45 @@ func (p *ConnPool) Close() error { p.conns = nil p.connsMu.Unlock() - p.freeConnsMu.Lock() - p.freeConns = nil - p.freeConnsMu.Unlock() + p.idleConnsMu.Lock() + p.idleConns = nil + p.idleConnsMu.Unlock() return firstErr } -func (p *ConnPool) reapStaleConn() bool { - if len(p.freeConns) == 0 { - return false +func (p *ConnPool) reapStaleConn() *Conn { + if len(p.idleConns) == 0 { + return nil } - cn := p.freeConns[0] + cn := p.idleConns[0] if !cn.IsStale(p.opt.IdleTimeout) { - return false + return nil } - p.CloseConn(cn) - p.freeConns = append(p.freeConns[:0], p.freeConns[1:]...) + p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...) - return true + return cn } func (p *ConnPool) ReapStaleConns() (int, error) { var n int for { - p.queue <- struct{}{} - p.freeConnsMu.Lock() + p.getTurn() - reaped := p.reapStaleConn() + p.idleConnsMu.Lock() + cn := p.reapStaleConn() + p.idleConnsMu.Unlock() + + if cn != nil { + p.removeConn(cn) + } - p.freeConnsMu.Unlock() - <-p.queue + p.freeTurn() - if reaped { + if cn != nil { + p.closeConn(cn) n++ } else { break diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool_single.go b/vendor/github.com/go-redis/redis/internal/pool/pool_single.go index ff91279b3..b35b78afb 100644 --- a/vendor/github.com/go-redis/redis/internal/pool/pool_single.go +++ b/vendor/github.com/go-redis/redis/internal/pool/pool_single.go @@ -20,29 +20,27 @@ func (p *SingleConnPool) CloseConn(*Conn) error { panic("not implemented") } -func (p *SingleConnPool) Get() (*Conn, bool, error) { - return p.cn, false, nil +func (p *SingleConnPool) Get() (*Conn, error) { + return p.cn, nil } -func (p *SingleConnPool) Put(cn *Conn) error { +func (p *SingleConnPool) Put(cn *Conn) { if p.cn != cn { panic("p.cn != cn") } - return nil } -func (p *SingleConnPool) Remove(cn *Conn) error { +func (p *SingleConnPool) Remove(cn *Conn) { if p.cn != cn { panic("p.cn != cn") } - return nil } func (p *SingleConnPool) Len() int { return 1 } -func (p *SingleConnPool) FreeLen() int { +func (p *SingleConnPool) IdleLen() int { return 0 } diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go b/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go index 17f163858..91bd91333 100644 --- a/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go +++ b/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go @@ -28,55 +28,40 @@ func (p *StickyConnPool) CloseConn(*Conn) error { panic("not implemented") } -func (p *StickyConnPool) Get() (*Conn, bool, error) { +func (p *StickyConnPool) Get() (*Conn, error) { p.mu.Lock() defer p.mu.Unlock() if p.closed { - return nil, false, ErrClosed + return nil, ErrClosed } if p.cn != nil { - return p.cn, false, nil + return p.cn, nil } - cn, _, err := p.pool.Get() + cn, err := p.pool.Get() if err != nil { - return nil, false, err + return nil, err } + p.cn = cn - return cn, true, nil + return cn, nil } -func (p *StickyConnPool) putUpstream() (err error) { - err = p.pool.Put(p.cn) +func (p *StickyConnPool) putUpstream() { + p.pool.Put(p.cn) p.cn = nil - return err } -func (p *StickyConnPool) Put(cn *Conn) error { - p.mu.Lock() - defer p.mu.Unlock() - - if p.closed { - return ErrClosed - } - return nil -} +func (p *StickyConnPool) Put(cn *Conn) {} -func (p *StickyConnPool) removeUpstream() error { - err := p.pool.Remove(p.cn) +func (p *StickyConnPool) removeUpstream() { + p.pool.Remove(p.cn) p.cn = nil - return err } -func (p *StickyConnPool) Remove(cn *Conn) error { - p.mu.Lock() - defer p.mu.Unlock() - - if p.closed { - return nil - } - return p.removeUpstream() +func (p *StickyConnPool) Remove(cn *Conn) { + p.removeUpstream() } func (p *StickyConnPool) Len() int { @@ -89,7 +74,7 @@ func (p *StickyConnPool) Len() int { return 1 } -func (p *StickyConnPool) FreeLen() int { +func (p *StickyConnPool) IdleLen() int { p.mu.Lock() defer p.mu.Unlock() @@ -111,13 +96,14 @@ func (p *StickyConnPool) Close() error { return ErrClosed } p.closed = true - var err error + if p.cn != nil { if p.reusable { - err = p.putUpstream() + p.putUpstream() } else { - err = p.removeUpstream() + p.removeUpstream() } } - return err + + return nil } |