From 1329aa51b605cb54ba9aae3a82a0a87b881fb7b3 Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Mon, 13 Nov 2017 09:09:58 -0800 Subject: Updating server dependancies. (#7816) --- vendor/github.com/go-redis/redis/commands.go | 5 +-- .../go-redis/redis/internal/pool/pool.go | 19 +++++++---- .../go-redis/redis/internal/proto/scan.go | 3 +- vendor/github.com/go-redis/redis/internal/util.go | 37 +++++++++++++++------- vendor/github.com/go-redis/redis/pubsub.go | 36 ++++++++++++--------- vendor/github.com/go-redis/redis/pubsub_test.go | 16 ++++++++++ vendor/github.com/go-redis/redis/race_test.go | 9 ++++-- 7 files changed, 87 insertions(+), 38 deletions(-) (limited to 'vendor/github.com/go-redis/redis') diff --git a/vendor/github.com/go-redis/redis/commands.go b/vendor/github.com/go-redis/redis/commands.go index a3b90f12d..c04b3c49b 100644 --- a/vendor/github.com/go-redis/redis/commands.go +++ b/vendor/github.com/go-redis/redis/commands.go @@ -214,6 +214,7 @@ type Cmdable interface { ScriptKill() *StatusCmd ScriptLoad(script string) *StringCmd DebugObject(key string) *StringCmd + Publish(channel string, message interface{}) *IntCmd PubSubChannels(pattern string) *StringSliceCmd PubSubNumSub(channels ...string) *StringIntMapCmd PubSubNumPat() *IntCmd @@ -1880,8 +1881,8 @@ func (c *cmdable) DebugObject(key string) *StringCmd { //------------------------------------------------------------------------------ // Publish posts the message to the channel. -func (c *cmdable) Publish(channel, message string) *IntCmd { - cmd := NewIntCmd("PUBLISH", channel, message) +func (c *cmdable) Publish(channel string, message interface{}) *IntCmd { + cmd := NewIntCmd("publish", channel, message) c.process(cmd) return cmd } diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool.go b/vendor/github.com/go-redis/redis/internal/pool/pool.go index 836ec1045..ae81905ea 100644 --- a/vendor/github.com/go-redis/redis/internal/pool/pool.go +++ b/vendor/github.com/go-redis/redis/internal/pool/pool.go @@ -60,8 +60,10 @@ type Options struct { type ConnPool struct { opt *Options - dialErrorsNum uint32 // atomic - _lastDialError atomic.Value + dialErrorsNum uint32 // atomic + + lastDialError error + lastDialErrorMu sync.RWMutex queue chan struct{} @@ -98,7 +100,7 @@ func (p *ConnPool) NewConn() (*Conn, error) { } if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) { - return nil, p.lastDialError() + return nil, p.getLastDialError() } netConn, err := p.opt.Dialer() @@ -138,11 +140,16 @@ func (p *ConnPool) tryDial() { } func (p *ConnPool) setLastDialError(err error) { - p._lastDialError.Store(err) + p.lastDialErrorMu.Lock() + p.lastDialError = err + p.lastDialErrorMu.Unlock() } -func (p *ConnPool) lastDialError() error { - return p._lastDialError.Load().(error) +func (p *ConnPool) getLastDialError() error { + p.lastDialErrorMu.RLock() + err := p.lastDialError + p.lastDialErrorMu.RUnlock() + return err } // Get returns existed connection from the pool or creates a new one. diff --git a/vendor/github.com/go-redis/redis/internal/proto/scan.go b/vendor/github.com/go-redis/redis/internal/proto/scan.go index 0431a877d..03c8b59aa 100644 --- a/vendor/github.com/go-redis/redis/internal/proto/scan.go +++ b/vendor/github.com/go-redis/redis/internal/proto/scan.go @@ -120,8 +120,9 @@ func ScanSlice(data []string, slice interface{}) error { return fmt.Errorf("redis: ScanSlice(non-slice %T)", slice) } + next := internal.MakeSliceNextElemFunc(v) for i, s := range data { - elem := internal.SliceNextElem(v) + elem := next() if err := Scan(internal.StringToBytes(s), elem.Addr().Interface()); err != nil { return fmt.Errorf("redis: ScanSlice(index=%d value=%q) failed: %s", i, s, err) } diff --git a/vendor/github.com/go-redis/redis/internal/util.go b/vendor/github.com/go-redis/redis/internal/util.go index 520596fd9..1ba9805fe 100644 --- a/vendor/github.com/go-redis/redis/internal/util.go +++ b/vendor/github.com/go-redis/redis/internal/util.go @@ -28,20 +28,35 @@ func isLower(s string) bool { return true } -func SliceNextElem(v reflect.Value) reflect.Value { - if v.Len() < v.Cap() { - v.Set(v.Slice(0, v.Len()+1)) - return v.Index(v.Len() - 1) - } - +func MakeSliceNextElemFunc(v reflect.Value) func() reflect.Value { elemType := v.Type().Elem() if elemType.Kind() == reflect.Ptr { - elem := reflect.New(elemType.Elem()) - v.Set(reflect.Append(v, elem)) - return elem.Elem() + elemType = elemType.Elem() + return func() reflect.Value { + if v.Len() < v.Cap() { + v.Set(v.Slice(0, v.Len()+1)) + elem := v.Index(v.Len() - 1) + if elem.IsNil() { + elem.Set(reflect.New(elemType)) + } + return elem.Elem() + } + + elem := reflect.New(elemType) + v.Set(reflect.Append(v, elem)) + return elem.Elem() + } } - v.Set(reflect.Append(v, reflect.Zero(elemType))) - return v.Index(v.Len() - 1) + zero := reflect.Zero(elemType) + return func() reflect.Value { + if v.Len() < v.Cap() { + v.Set(v.Slice(0, v.Len()+1)) + return v.Index(v.Len() - 1) + } + + v.Set(reflect.Append(v, zero)) + return v.Index(v.Len() - 1) + } } diff --git a/vendor/github.com/go-redis/redis/pubsub.go b/vendor/github.com/go-redis/redis/pubsub.go index e754a16f2..01f8a61aa 100644 --- a/vendor/github.com/go-redis/redis/pubsub.go +++ b/vendor/github.com/go-redis/redis/pubsub.go @@ -29,6 +29,9 @@ type PubSub struct { closed bool cmd *Cmd + + chOnce sync.Once + ch chan *Message } func (c *PubSub) conn() (*pool.Conn, error) { @@ -346,24 +349,27 @@ func (c *PubSub) receiveMessage(timeout time.Duration) (*Message, error) { } } -// Channel returns a channel for concurrently receiving messages. -// The channel is closed with PubSub. +// Channel returns a Go channel for concurrently receiving messages. +// The channel is closed with PubSub. Receive or ReceiveMessage APIs +// can not be used after channel is created. func (c *PubSub) Channel() <-chan *Message { - ch := make(chan *Message, 100) - go func() { - for { - msg, err := c.ReceiveMessage() - if err != nil { - if err == pool.ErrClosed { - break + c.chOnce.Do(func() { + c.ch = make(chan *Message, 100) + go func() { + for { + msg, err := c.ReceiveMessage() + if err != nil { + if err == pool.ErrClosed { + break + } + continue } - continue + c.ch <- msg } - ch <- msg - } - close(ch) - }() - return ch + close(c.ch) + }() + }) + return c.ch } func appendIfNotExists(ss []string, es ...string) []string { diff --git a/vendor/github.com/go-redis/redis/pubsub_test.go b/vendor/github.com/go-redis/redis/pubsub_test.go index 6fc04a198..6a85bd038 100644 --- a/vendor/github.com/go-redis/redis/pubsub_test.go +++ b/vendor/github.com/go-redis/redis/pubsub_test.go @@ -424,4 +424,20 @@ var _ = Describe("PubSub", func() { wg.Wait() }) + + It("handles big message payload", func() { + pubsub := client.Subscribe("mychannel") + defer pubsub.Close() + + ch := pubsub.Channel() + + bigVal := bigVal() + err := client.Publish("mychannel", bigVal).Err() + Expect(err).NotTo(HaveOccurred()) + + var msg *redis.Message + Eventually(ch).Should(Receive(&msg)) + Expect(msg.Channel).To(Equal("mychannel")) + Expect(msg.Payload).To(Equal(string(bigVal))) + }) }) diff --git a/vendor/github.com/go-redis/redis/race_test.go b/vendor/github.com/go-redis/redis/race_test.go index 5bcb0768e..14264086c 100644 --- a/vendor/github.com/go-redis/redis/race_test.go +++ b/vendor/github.com/go-redis/redis/race_test.go @@ -105,7 +105,7 @@ var _ = Describe("races", func() { It("should handle big vals in Get", func() { C, N = 4, 100 - bigVal := bytes.Repeat([]byte{'*'}, 1<<17) // 128kb + bigVal := bigVal() err := client.Set("key", bigVal, 0).Err() Expect(err).NotTo(HaveOccurred()) @@ -126,8 +126,7 @@ var _ = Describe("races", func() { It("should handle big vals in Set", func() { C, N = 4, 100 - bigVal := bytes.Repeat([]byte{'*'}, 1<<17) // 128kb - + bigVal := bigVal() perform(C, func(id int) { for i := 0; i < N; i++ { err := client.Set("key", bigVal, 0).Err() @@ -245,3 +244,7 @@ var _ = Describe("races", func() { Expect(n).To(Equal(int64(N))) }) }) + +func bigVal() []byte { + return bytes.Repeat([]byte{'*'}, 1<<17) // 128kb +} -- cgit v1.2.3-1-g7c22