summaryrefslogtreecommitdiffstats
path: root/Godeps/_workspace/src/gopkg.in/redis.v2/pool.go
diff options
context:
space:
mode:
author=Corey Hulen <corey@hulen.com>2015-07-29 01:26:10 -0800
committer=Corey Hulen <corey@hulen.com>2015-07-29 01:26:10 -0800
commit9677a9f71777d75f3def0b0cb238050a30ec6a67 (patch)
tree8579fae8134ef2167a2cc97d95b0225e8acfdb46 /Godeps/_workspace/src/gopkg.in/redis.v2/pool.go
parent81e55eb57c367a1403f693712411c0781287ea55 (diff)
downloadchat-9677a9f71777d75f3def0b0cb238050a30ec6a67.tar.gz
chat-9677a9f71777d75f3def0b0cb238050a30ec6a67.tar.bz2
chat-9677a9f71777d75f3def0b0cb238050a30ec6a67.zip
Fixes mm-1355 adds rate limiting apis
Diffstat (limited to 'Godeps/_workspace/src/gopkg.in/redis.v2/pool.go')
-rw-r--r--Godeps/_workspace/src/gopkg.in/redis.v2/pool.go405
1 files changed, 0 insertions, 405 deletions
diff --git a/Godeps/_workspace/src/gopkg.in/redis.v2/pool.go b/Godeps/_workspace/src/gopkg.in/redis.v2/pool.go
deleted file mode 100644
index bca4d1963..000000000
--- a/Godeps/_workspace/src/gopkg.in/redis.v2/pool.go
+++ /dev/null
@@ -1,405 +0,0 @@
-package redis
-
-import (
- "container/list"
- "errors"
- "log"
- "net"
- "sync"
- "time"
-
- "gopkg.in/bufio.v1"
-)
-
-var (
- errClosed = errors.New("redis: client is closed")
- errRateLimited = errors.New("redis: you open connections too fast")
-)
-
-var (
- zeroTime = time.Time{}
-)
-
-type pool interface {
- Get() (*conn, bool, error)
- Put(*conn) error
- Remove(*conn) error
- Len() int
- Size() int
- Close() error
- Filter(func(*conn) bool)
-}
-
-//------------------------------------------------------------------------------
-
-type conn struct {
- netcn net.Conn
- rd *bufio.Reader
- buf []byte
-
- inUse bool
- usedAt time.Time
-
- readTimeout time.Duration
- writeTimeout time.Duration
-
- elem *list.Element
-}
-
-func newConnFunc(dial func() (net.Conn, error)) func() (*conn, error) {
- return func() (*conn, error) {
- netcn, err := dial()
- if err != nil {
- return nil, err
- }
- cn := &conn{
- netcn: netcn,
- buf: make([]byte, 0, 64),
- }
- cn.rd = bufio.NewReader(cn)
- return cn, nil
- }
-}
-
-func (cn *conn) Read(b []byte) (int, error) {
- if cn.readTimeout != 0 {
- cn.netcn.SetReadDeadline(time.Now().Add(cn.readTimeout))
- } else {
- cn.netcn.SetReadDeadline(zeroTime)
- }
- return cn.netcn.Read(b)
-}
-
-func (cn *conn) Write(b []byte) (int, error) {
- if cn.writeTimeout != 0 {
- cn.netcn.SetWriteDeadline(time.Now().Add(cn.writeTimeout))
- } else {
- cn.netcn.SetWriteDeadline(zeroTime)
- }
- return cn.netcn.Write(b)
-}
-
-func (cn *conn) RemoteAddr() net.Addr {
- return cn.netcn.RemoteAddr()
-}
-
-func (cn *conn) Close() error {
- return cn.netcn.Close()
-}
-
-//------------------------------------------------------------------------------
-
-type connPool struct {
- dial func() (*conn, error)
- rl *rateLimiter
-
- opt *options
-
- cond *sync.Cond
- conns *list.List
-
- idleNum int
- closed bool
-}
-
-func newConnPool(dial func() (*conn, error), opt *options) *connPool {
- return &connPool{
- dial: dial,
- rl: newRateLimiter(time.Second, 2*opt.PoolSize),
-
- opt: opt,
-
- cond: sync.NewCond(&sync.Mutex{}),
- conns: list.New(),
- }
-}
-
-func (p *connPool) new() (*conn, error) {
- if !p.rl.Check() {
- return nil, errRateLimited
- }
- return p.dial()
-}
-
-func (p *connPool) Get() (*conn, bool, error) {
- p.cond.L.Lock()
-
- if p.closed {
- p.cond.L.Unlock()
- return nil, false, errClosed
- }
-
- if p.opt.IdleTimeout > 0 {
- for el := p.conns.Front(); el != nil; el = el.Next() {
- cn := el.Value.(*conn)
- if cn.inUse {
- break
- }
- if time.Since(cn.usedAt) > p.opt.IdleTimeout {
- if err := p.remove(cn); err != nil {
- log.Printf("remove failed: %s", err)
- }
- }
- }
- }
-
- for p.conns.Len() >= p.opt.PoolSize && p.idleNum == 0 {
- p.cond.Wait()
- }
-
- if p.idleNum > 0 {
- elem := p.conns.Front()
- cn := elem.Value.(*conn)
- if cn.inUse {
- panic("pool: precondition failed")
- }
- cn.inUse = true
- p.conns.MoveToBack(elem)
- p.idleNum--
-
- p.cond.L.Unlock()
- return cn, false, nil
- }
-
- if p.conns.Len() < p.opt.PoolSize {
- cn, err := p.new()
- if err != nil {
- p.cond.L.Unlock()
- return nil, false, err
- }
-
- cn.inUse = true
- cn.elem = p.conns.PushBack(cn)
-
- p.cond.L.Unlock()
- return cn, true, nil
- }
-
- panic("not reached")
-}
-
-func (p *connPool) Put(cn *conn) error {
- if cn.rd.Buffered() != 0 {
- b, _ := cn.rd.ReadN(cn.rd.Buffered())
- log.Printf("redis: connection has unread data: %q", b)
- return p.Remove(cn)
- }
-
- if p.opt.IdleTimeout > 0 {
- cn.usedAt = time.Now()
- }
-
- p.cond.L.Lock()
- if p.closed {
- p.cond.L.Unlock()
- return errClosed
- }
- cn.inUse = false
- p.conns.MoveToFront(cn.elem)
- p.idleNum++
- p.cond.Signal()
- p.cond.L.Unlock()
-
- return nil
-}
-
-func (p *connPool) Remove(cn *conn) error {
- p.cond.L.Lock()
- if p.closed {
- // Noop, connection is already closed.
- p.cond.L.Unlock()
- return nil
- }
- err := p.remove(cn)
- p.cond.Signal()
- p.cond.L.Unlock()
- return err
-}
-
-func (p *connPool) remove(cn *conn) error {
- p.conns.Remove(cn.elem)
- cn.elem = nil
- if !cn.inUse {
- p.idleNum--
- }
- return cn.Close()
-}
-
-// Len returns number of idle connections.
-func (p *connPool) Len() int {
- defer p.cond.L.Unlock()
- p.cond.L.Lock()
- return p.idleNum
-}
-
-// Size returns number of connections in the pool.
-func (p *connPool) Size() int {
- defer p.cond.L.Unlock()
- p.cond.L.Lock()
- return p.conns.Len()
-}
-
-func (p *connPool) Filter(f func(*conn) bool) {
- p.cond.L.Lock()
- for el, next := p.conns.Front(), p.conns.Front(); el != nil; el = next {
- next = el.Next()
- cn := el.Value.(*conn)
- if !f(cn) {
- p.remove(cn)
- }
- }
- p.cond.L.Unlock()
-}
-
-func (p *connPool) Close() error {
- defer p.cond.L.Unlock()
- p.cond.L.Lock()
- if p.closed {
- return nil
- }
- p.closed = true
- p.rl.Close()
- var retErr error
- for {
- e := p.conns.Front()
- if e == nil {
- break
- }
- if err := p.remove(e.Value.(*conn)); err != nil {
- log.Printf("cn.Close failed: %s", err)
- retErr = err
- }
- }
- return retErr
-}
-
-//------------------------------------------------------------------------------
-
-type singleConnPool struct {
- pool pool
-
- cnMtx sync.Mutex
- cn *conn
-
- reusable bool
-
- closed bool
-}
-
-func newSingleConnPool(pool pool, reusable bool) *singleConnPool {
- return &singleConnPool{
- pool: pool,
- reusable: reusable,
- }
-}
-
-func (p *singleConnPool) SetConn(cn *conn) {
- p.cnMtx.Lock()
- p.cn = cn
- p.cnMtx.Unlock()
-}
-
-func (p *singleConnPool) Get() (*conn, bool, error) {
- defer p.cnMtx.Unlock()
- p.cnMtx.Lock()
-
- if p.closed {
- return nil, false, errClosed
- }
- if p.cn != nil {
- return p.cn, false, nil
- }
-
- cn, isNew, err := p.pool.Get()
- if err != nil {
- return nil, false, err
- }
- p.cn = cn
-
- return p.cn, isNew, nil
-}
-
-func (p *singleConnPool) Put(cn *conn) error {
- defer p.cnMtx.Unlock()
- p.cnMtx.Lock()
- if p.cn != cn {
- panic("p.cn != cn")
- }
- if p.closed {
- return errClosed
- }
- return nil
-}
-
-func (p *singleConnPool) put() error {
- err := p.pool.Put(p.cn)
- p.cn = nil
- return err
-}
-
-func (p *singleConnPool) Remove(cn *conn) error {
- defer p.cnMtx.Unlock()
- p.cnMtx.Lock()
- if p.cn == nil {
- panic("p.cn == nil")
- }
- if p.cn != cn {
- panic("p.cn != cn")
- }
- if p.closed {
- return errClosed
- }
- return p.remove()
-}
-
-func (p *singleConnPool) remove() error {
- err := p.pool.Remove(p.cn)
- p.cn = nil
- return err
-}
-
-func (p *singleConnPool) Len() int {
- defer p.cnMtx.Unlock()
- p.cnMtx.Lock()
- if p.cn == nil {
- return 0
- }
- return 1
-}
-
-func (p *singleConnPool) Size() int {
- defer p.cnMtx.Unlock()
- p.cnMtx.Lock()
- if p.cn == nil {
- return 0
- }
- return 1
-}
-
-func (p *singleConnPool) Filter(f func(*conn) bool) {
- p.cnMtx.Lock()
- if p.cn != nil {
- if !f(p.cn) {
- p.remove()
- }
- }
- p.cnMtx.Unlock()
-}
-
-func (p *singleConnPool) Close() error {
- defer p.cnMtx.Unlock()
- p.cnMtx.Lock()
- if p.closed {
- return nil
- }
- p.closed = true
- var err error
- if p.cn != nil {
- if p.reusable {
- err = p.put()
- } else {
- err = p.remove()
- }
- }
- return err
-}