summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/go-redis')
-rw-r--r--vendor/github.com/go-redis/redis/commands.go5
-rw-r--r--vendor/github.com/go-redis/redis/internal/pool/pool.go19
-rw-r--r--vendor/github.com/go-redis/redis/internal/proto/scan.go3
-rw-r--r--vendor/github.com/go-redis/redis/internal/util.go37
-rw-r--r--vendor/github.com/go-redis/redis/pubsub.go36
-rw-r--r--vendor/github.com/go-redis/redis/pubsub_test.go16
-rw-r--r--vendor/github.com/go-redis/redis/race_test.go9
7 files changed, 87 insertions, 38 deletions
diff --git a/vendor/github.com/go-redis/redis/commands.go b/vendor/github.com/go-redis/redis/commands.go
index a3b90f12d..c04b3c49b 100644
--- a/vendor/github.com/go-redis/redis/commands.go
+++ b/vendor/github.com/go-redis/redis/commands.go
@@ -214,6 +214,7 @@ type Cmdable interface {
ScriptKill() *StatusCmd
ScriptLoad(script string) *StringCmd
DebugObject(key string) *StringCmd
+ Publish(channel string, message interface{}) *IntCmd
PubSubChannels(pattern string) *StringSliceCmd
PubSubNumSub(channels ...string) *StringIntMapCmd
PubSubNumPat() *IntCmd
@@ -1880,8 +1881,8 @@ func (c *cmdable) DebugObject(key string) *StringCmd {
//------------------------------------------------------------------------------
// Publish posts the message to the channel.
-func (c *cmdable) Publish(channel, message string) *IntCmd {
- cmd := NewIntCmd("PUBLISH", channel, message)
+func (c *cmdable) Publish(channel string, message interface{}) *IntCmd {
+ cmd := NewIntCmd("publish", channel, message)
c.process(cmd)
return cmd
}
diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool.go b/vendor/github.com/go-redis/redis/internal/pool/pool.go
index 836ec1045..ae81905ea 100644
--- a/vendor/github.com/go-redis/redis/internal/pool/pool.go
+++ b/vendor/github.com/go-redis/redis/internal/pool/pool.go
@@ -60,8 +60,10 @@ type Options struct {
type ConnPool struct {
opt *Options
- dialErrorsNum uint32 // atomic
- _lastDialError atomic.Value
+ dialErrorsNum uint32 // atomic
+
+ lastDialError error
+ lastDialErrorMu sync.RWMutex
queue chan struct{}
@@ -98,7 +100,7 @@ func (p *ConnPool) NewConn() (*Conn, error) {
}
if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) {
- return nil, p.lastDialError()
+ return nil, p.getLastDialError()
}
netConn, err := p.opt.Dialer()
@@ -138,11 +140,16 @@ func (p *ConnPool) tryDial() {
}
func (p *ConnPool) setLastDialError(err error) {
- p._lastDialError.Store(err)
+ p.lastDialErrorMu.Lock()
+ p.lastDialError = err
+ p.lastDialErrorMu.Unlock()
}
-func (p *ConnPool) lastDialError() error {
- return p._lastDialError.Load().(error)
+func (p *ConnPool) getLastDialError() error {
+ p.lastDialErrorMu.RLock()
+ err := p.lastDialError
+ p.lastDialErrorMu.RUnlock()
+ return err
}
// Get returns existed connection from the pool or creates a new one.
diff --git a/vendor/github.com/go-redis/redis/internal/proto/scan.go b/vendor/github.com/go-redis/redis/internal/proto/scan.go
index 0431a877d..03c8b59aa 100644
--- a/vendor/github.com/go-redis/redis/internal/proto/scan.go
+++ b/vendor/github.com/go-redis/redis/internal/proto/scan.go
@@ -120,8 +120,9 @@ func ScanSlice(data []string, slice interface{}) error {
return fmt.Errorf("redis: ScanSlice(non-slice %T)", slice)
}
+ next := internal.MakeSliceNextElemFunc(v)
for i, s := range data {
- elem := internal.SliceNextElem(v)
+ elem := next()
if err := Scan(internal.StringToBytes(s), elem.Addr().Interface()); err != nil {
return fmt.Errorf("redis: ScanSlice(index=%d value=%q) failed: %s", i, s, err)
}
diff --git a/vendor/github.com/go-redis/redis/internal/util.go b/vendor/github.com/go-redis/redis/internal/util.go
index 520596fd9..1ba9805fe 100644
--- a/vendor/github.com/go-redis/redis/internal/util.go
+++ b/vendor/github.com/go-redis/redis/internal/util.go
@@ -28,20 +28,35 @@ func isLower(s string) bool {
return true
}
-func SliceNextElem(v reflect.Value) reflect.Value {
- if v.Len() < v.Cap() {
- v.Set(v.Slice(0, v.Len()+1))
- return v.Index(v.Len() - 1)
- }
-
+func MakeSliceNextElemFunc(v reflect.Value) func() reflect.Value {
elemType := v.Type().Elem()
if elemType.Kind() == reflect.Ptr {
- elem := reflect.New(elemType.Elem())
- v.Set(reflect.Append(v, elem))
- return elem.Elem()
+ elemType = elemType.Elem()
+ return func() reflect.Value {
+ if v.Len() < v.Cap() {
+ v.Set(v.Slice(0, v.Len()+1))
+ elem := v.Index(v.Len() - 1)
+ if elem.IsNil() {
+ elem.Set(reflect.New(elemType))
+ }
+ return elem.Elem()
+ }
+
+ elem := reflect.New(elemType)
+ v.Set(reflect.Append(v, elem))
+ return elem.Elem()
+ }
}
- v.Set(reflect.Append(v, reflect.Zero(elemType)))
- return v.Index(v.Len() - 1)
+ zero := reflect.Zero(elemType)
+ return func() reflect.Value {
+ if v.Len() < v.Cap() {
+ v.Set(v.Slice(0, v.Len()+1))
+ return v.Index(v.Len() - 1)
+ }
+
+ v.Set(reflect.Append(v, zero))
+ return v.Index(v.Len() - 1)
+ }
}
diff --git a/vendor/github.com/go-redis/redis/pubsub.go b/vendor/github.com/go-redis/redis/pubsub.go
index e754a16f2..01f8a61aa 100644
--- a/vendor/github.com/go-redis/redis/pubsub.go
+++ b/vendor/github.com/go-redis/redis/pubsub.go
@@ -29,6 +29,9 @@ type PubSub struct {
closed bool
cmd *Cmd
+
+ chOnce sync.Once
+ ch chan *Message
}
func (c *PubSub) conn() (*pool.Conn, error) {
@@ -346,24 +349,27 @@ func (c *PubSub) receiveMessage(timeout time.Duration) (*Message, error) {
}
}
-// Channel returns a channel for concurrently receiving messages.
-// The channel is closed with PubSub.
+// Channel returns a Go channel for concurrently receiving messages.
+// The channel is closed with PubSub. Receive or ReceiveMessage APIs
+// can not be used after channel is created.
func (c *PubSub) Channel() <-chan *Message {
- ch := make(chan *Message, 100)
- go func() {
- for {
- msg, err := c.ReceiveMessage()
- if err != nil {
- if err == pool.ErrClosed {
- break
+ c.chOnce.Do(func() {
+ c.ch = make(chan *Message, 100)
+ go func() {
+ for {
+ msg, err := c.ReceiveMessage()
+ if err != nil {
+ if err == pool.ErrClosed {
+ break
+ }
+ continue
}
- continue
+ c.ch <- msg
}
- ch <- msg
- }
- close(ch)
- }()
- return ch
+ close(c.ch)
+ }()
+ })
+ return c.ch
}
func appendIfNotExists(ss []string, es ...string) []string {
diff --git a/vendor/github.com/go-redis/redis/pubsub_test.go b/vendor/github.com/go-redis/redis/pubsub_test.go
index 6fc04a198..6a85bd038 100644
--- a/vendor/github.com/go-redis/redis/pubsub_test.go
+++ b/vendor/github.com/go-redis/redis/pubsub_test.go
@@ -424,4 +424,20 @@ var _ = Describe("PubSub", func() {
wg.Wait()
})
+
+ It("handles big message payload", func() {
+ pubsub := client.Subscribe("mychannel")
+ defer pubsub.Close()
+
+ ch := pubsub.Channel()
+
+ bigVal := bigVal()
+ err := client.Publish("mychannel", bigVal).Err()
+ Expect(err).NotTo(HaveOccurred())
+
+ var msg *redis.Message
+ Eventually(ch).Should(Receive(&msg))
+ Expect(msg.Channel).To(Equal("mychannel"))
+ Expect(msg.Payload).To(Equal(string(bigVal)))
+ })
})
diff --git a/vendor/github.com/go-redis/redis/race_test.go b/vendor/github.com/go-redis/redis/race_test.go
index 5bcb0768e..14264086c 100644
--- a/vendor/github.com/go-redis/redis/race_test.go
+++ b/vendor/github.com/go-redis/redis/race_test.go
@@ -105,7 +105,7 @@ var _ = Describe("races", func() {
It("should handle big vals in Get", func() {
C, N = 4, 100
- bigVal := bytes.Repeat([]byte{'*'}, 1<<17) // 128kb
+ bigVal := bigVal()
err := client.Set("key", bigVal, 0).Err()
Expect(err).NotTo(HaveOccurred())
@@ -126,8 +126,7 @@ var _ = Describe("races", func() {
It("should handle big vals in Set", func() {
C, N = 4, 100
- bigVal := bytes.Repeat([]byte{'*'}, 1<<17) // 128kb
-
+ bigVal := bigVal()
perform(C, func(id int) {
for i := 0; i < N; i++ {
err := client.Set("key", bigVal, 0).Err()
@@ -245,3 +244,7 @@ var _ = Describe("races", func() {
Expect(n).To(Equal(int64(N)))
})
})
+
+func bigVal() []byte {
+ return bytes.Repeat([]byte{'*'}, 1<<17) // 128kb
+}