diff options
Diffstat (limited to 'vendor/github.com/garyburd/redigo/redisx')
-rw-r--r-- | vendor/github.com/garyburd/redigo/redisx/connmux.go | 152 | ||||
-rw-r--r-- | vendor/github.com/garyburd/redigo/redisx/connmux_test.go | 259 | ||||
-rw-r--r-- | vendor/github.com/garyburd/redigo/redisx/doc.go | 17 |
3 files changed, 0 insertions, 428 deletions
diff --git a/vendor/github.com/garyburd/redigo/redisx/connmux.go b/vendor/github.com/garyburd/redigo/redisx/connmux.go deleted file mode 100644 index af2cced3f..000000000 --- a/vendor/github.com/garyburd/redigo/redisx/connmux.go +++ /dev/null @@ -1,152 +0,0 @@ -// Copyright 2014 Gary Burd -// -// Licensed under the Apache License, Version 2.0 (the "License"): you may -// not use this file except in compliance with the License. You may obtain -// a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -// License for the specific language governing permissions and limitations -// under the License. - -package redisx - -import ( - "errors" - "sync" - - "github.com/garyburd/redigo/internal" - "github.com/garyburd/redigo/redis" -) - -// ConnMux multiplexes one or more connections to a single underlying -// connection. The ConnMux connections do not support concurrency, commands -// that associate server side state with the connection or commands that put -// the connection in a special mode. -type ConnMux struct { - c redis.Conn - - sendMu sync.Mutex - sendID uint - - recvMu sync.Mutex - recvID uint - recvWait map[uint]chan struct{} -} - -func NewConnMux(c redis.Conn) *ConnMux { - return &ConnMux{c: c, recvWait: make(map[uint]chan struct{})} -} - -// Get gets a connection. The application must close the returned connection. -func (p *ConnMux) Get() redis.Conn { - c := &muxConn{p: p} - c.ids = c.buf[:0] - return c -} - -// Close closes the underlying connection. -func (p *ConnMux) Close() error { - return p.c.Close() -} - -type muxConn struct { - p *ConnMux - ids []uint - buf [8]uint -} - -func (c *muxConn) send(flush bool, cmd string, args ...interface{}) error { - if internal.LookupCommandInfo(cmd).Set != 0 { - return errors.New("command not supported by mux pool") - } - p := c.p - p.sendMu.Lock() - id := p.sendID - c.ids = append(c.ids, id) - p.sendID++ - err := p.c.Send(cmd, args...) - if flush { - err = p.c.Flush() - } - p.sendMu.Unlock() - return err -} - -func (c *muxConn) Send(cmd string, args ...interface{}) error { - return c.send(false, cmd, args...) -} - -func (c *muxConn) Flush() error { - p := c.p - p.sendMu.Lock() - err := p.c.Flush() - p.sendMu.Unlock() - return err -} - -func (c *muxConn) Receive() (interface{}, error) { - if len(c.ids) == 0 { - return nil, errors.New("mux pool underflow") - } - - id := c.ids[0] - c.ids = c.ids[1:] - if len(c.ids) == 0 { - c.ids = c.buf[:0] - } - - p := c.p - p.recvMu.Lock() - if p.recvID != id { - ch := make(chan struct{}) - p.recvWait[id] = ch - p.recvMu.Unlock() - <-ch - p.recvMu.Lock() - if p.recvID != id { - panic("out of sync") - } - } - - v, err := p.c.Receive() - - id++ - p.recvID = id - ch, ok := p.recvWait[id] - if ok { - delete(p.recvWait, id) - } - p.recvMu.Unlock() - if ok { - ch <- struct{}{} - } - - return v, err -} - -func (c *muxConn) Close() error { - var err error - if len(c.ids) == 0 { - return nil - } - c.Flush() - for _ = range c.ids { - _, err = c.Receive() - } - return err -} - -func (c *muxConn) Do(cmd string, args ...interface{}) (interface{}, error) { - if err := c.send(true, cmd, args...); err != nil { - return nil, err - } - return c.Receive() -} - -func (c *muxConn) Err() error { - return c.p.c.Err() -} diff --git a/vendor/github.com/garyburd/redigo/redisx/connmux_test.go b/vendor/github.com/garyburd/redigo/redisx/connmux_test.go deleted file mode 100644 index 9c3c8b162..000000000 --- a/vendor/github.com/garyburd/redigo/redisx/connmux_test.go +++ /dev/null @@ -1,259 +0,0 @@ -// Copyright 2014 Gary Burd -// -// Licensed under the Apache License, Version 2.0 (the "License"): you may -// not use this file except in compliance with the License. You may obtain -// a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -// License for the specific language governing permissions and limitations -// under the License. - -package redisx_test - -import ( - "net/textproto" - "sync" - "testing" - - "github.com/garyburd/redigo/internal/redistest" - "github.com/garyburd/redigo/redis" - "github.com/garyburd/redigo/redisx" -) - -func TestConnMux(t *testing.T) { - c, err := redistest.Dial() - if err != nil { - t.Fatalf("error connection to database, %v", err) - } - m := redisx.NewConnMux(c) - defer m.Close() - - c1 := m.Get() - c2 := m.Get() - c1.Send("ECHO", "hello") - c2.Send("ECHO", "world") - c1.Flush() - c2.Flush() - s, err := redis.String(c1.Receive()) - if err != nil { - t.Fatal(err) - } - if s != "hello" { - t.Fatalf("echo returned %q, want %q", s, "hello") - } - s, err = redis.String(c2.Receive()) - if err != nil { - t.Fatal(err) - } - if s != "world" { - t.Fatalf("echo returned %q, want %q", s, "world") - } - c1.Close() - c2.Close() -} - -func TestConnMuxClose(t *testing.T) { - c, err := redistest.Dial() - if err != nil { - t.Fatalf("error connection to database, %v", err) - } - m := redisx.NewConnMux(c) - defer m.Close() - - c1 := m.Get() - c2 := m.Get() - - if err := c1.Send("ECHO", "hello"); err != nil { - t.Fatal(err) - } - if err := c1.Close(); err != nil { - t.Fatal(err) - } - - if err := c2.Send("ECHO", "world"); err != nil { - t.Fatal(err) - } - if err := c2.Flush(); err != nil { - t.Fatal(err) - } - - s, err := redis.String(c2.Receive()) - if err != nil { - t.Fatal(err) - } - if s != "world" { - t.Fatalf("echo returned %q, want %q", s, "world") - } - c2.Close() -} - -func BenchmarkConn(b *testing.B) { - b.StopTimer() - c, err := redistest.Dial() - if err != nil { - b.Fatalf("error connection to database, %v", err) - } - defer c.Close() - b.StartTimer() - - for i := 0; i < b.N; i++ { - if _, err := c.Do("PING"); err != nil { - b.Fatal(err) - } - } -} - -func BenchmarkConnMux(b *testing.B) { - b.StopTimer() - c, err := redistest.Dial() - if err != nil { - b.Fatalf("error connection to database, %v", err) - } - m := redisx.NewConnMux(c) - defer m.Close() - - b.StartTimer() - - for i := 0; i < b.N; i++ { - c := m.Get() - if _, err := c.Do("PING"); err != nil { - b.Fatal(err) - } - c.Close() - } -} - -func BenchmarkPool(b *testing.B) { - b.StopTimer() - - p := redis.Pool{Dial: redistest.Dial, MaxIdle: 1} - defer p.Close() - - // Fill the pool. - c := p.Get() - if err := c.Err(); err != nil { - b.Fatal(err) - } - c.Close() - - b.StartTimer() - - for i := 0; i < b.N; i++ { - c := p.Get() - if _, err := c.Do("PING"); err != nil { - b.Fatal(err) - } - c.Close() - } -} - -const numConcurrent = 10 - -func BenchmarkConnMuxConcurrent(b *testing.B) { - b.StopTimer() - c, err := redistest.Dial() - if err != nil { - b.Fatalf("error connection to database, %v", err) - } - defer c.Close() - - m := redisx.NewConnMux(c) - - var wg sync.WaitGroup - wg.Add(numConcurrent) - - b.StartTimer() - - for i := 0; i < numConcurrent; i++ { - go func() { - defer wg.Done() - for i := 0; i < b.N; i++ { - c := m.Get() - if _, err := c.Do("PING"); err != nil { - b.Fatal(err) - } - c.Close() - } - }() - } - wg.Wait() -} - -func BenchmarkPoolConcurrent(b *testing.B) { - b.StopTimer() - - p := redis.Pool{Dial: redistest.Dial, MaxIdle: numConcurrent} - defer p.Close() - - // Fill the pool. - conns := make([]redis.Conn, numConcurrent) - for i := range conns { - c := p.Get() - if err := c.Err(); err != nil { - b.Fatal(err) - } - conns[i] = c - } - for _, c := range conns { - c.Close() - } - - var wg sync.WaitGroup - wg.Add(numConcurrent) - - b.StartTimer() - - for i := 0; i < numConcurrent; i++ { - go func() { - defer wg.Done() - for i := 0; i < b.N; i++ { - c := p.Get() - if _, err := c.Do("PING"); err != nil { - b.Fatal(err) - } - c.Close() - } - }() - } - wg.Wait() -} - -func BenchmarkPipelineConcurrency(b *testing.B) { - b.StopTimer() - c, err := redistest.Dial() - if err != nil { - b.Fatalf("error connection to database, %v", err) - } - defer c.Close() - - var wg sync.WaitGroup - wg.Add(numConcurrent) - - var pipeline textproto.Pipeline - - b.StartTimer() - - for i := 0; i < numConcurrent; i++ { - go func() { - defer wg.Done() - for i := 0; i < b.N; i++ { - id := pipeline.Next() - pipeline.StartRequest(id) - c.Send("PING") - c.Flush() - pipeline.EndRequest(id) - pipeline.StartResponse(id) - _, err := c.Receive() - if err != nil { - b.Fatal(err) - } - pipeline.EndResponse(id) - } - }() - } - wg.Wait() -} diff --git a/vendor/github.com/garyburd/redigo/redisx/doc.go b/vendor/github.com/garyburd/redigo/redisx/doc.go deleted file mode 100644 index 91653dbe2..000000000 --- a/vendor/github.com/garyburd/redigo/redisx/doc.go +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright 2012 Gary Burd -// -// Licensed under the Apache License, Version 2.0 (the "License"): you may -// not use this file except in compliance with the License. You may obtain -// a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -// License for the specific language governing permissions and limitations -// under the License. - -// Package redisx contains experimental features for Redigo. Features in this -// package may be modified or deleted at any time. -package redisx // import "github.com/garyburd/redigo/redisx" |