summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/go-redis/redis/internal
diff options
context:
space:
mode:
authorChristopher Speller <crspeller@gmail.com>2017-07-31 08:15:23 -0700
committerGitHub <noreply@github.com>2017-07-31 08:15:23 -0700
commit09b49c26ddfdb20ced61e7dfd4192e750ce40449 (patch)
tree1288d069cc8a199b8eb3b858935dffd377ee3d2d /vendor/github.com/go-redis/redis/internal
parent6f4e38d129ffaf469d40fc8596d3957ee94d21e9 (diff)
downloadchat-09b49c26ddfdb20ced61e7dfd4192e750ce40449.tar.gz
chat-09b49c26ddfdb20ced61e7dfd4192e750ce40449.tar.bz2
chat-09b49c26ddfdb20ced61e7dfd4192e750ce40449.zip
PLT-5308 Caching layer part 2 (#6973)
* Adding Reaction store cache layer example * Implementing reaction store in new caching system. * Redis for reaction store * Adding redis library * Adding invalidation for DeleteAllWithEmojiName and other minor enhancements
Diffstat (limited to 'vendor/github.com/go-redis/redis/internal')
-rw-r--r--vendor/github.com/go-redis/redis/internal/consistenthash/consistenthash.go81
-rw-r--r--vendor/github.com/go-redis/redis/internal/consistenthash/consistenthash_test.go110
-rw-r--r--vendor/github.com/go-redis/redis/internal/errors.go75
-rw-r--r--vendor/github.com/go-redis/redis/internal/hashtag/hashtag.go73
-rw-r--r--vendor/github.com/go-redis/redis/internal/hashtag/hashtag_test.go74
-rw-r--r--vendor/github.com/go-redis/redis/internal/internal.go23
-rw-r--r--vendor/github.com/go-redis/redis/internal/internal_test.go17
-rw-r--r--vendor/github.com/go-redis/redis/internal/log.go15
-rw-r--r--vendor/github.com/go-redis/redis/internal/once.go60
-rw-r--r--vendor/github.com/go-redis/redis/internal/pool/bench_test.go80
-rw-r--r--vendor/github.com/go-redis/redis/internal/pool/conn.go78
-rw-r--r--vendor/github.com/go-redis/redis/internal/pool/main_test.go35
-rw-r--r--vendor/github.com/go-redis/redis/internal/pool/pool.go367
-rw-r--r--vendor/github.com/go-redis/redis/internal/pool/pool_single.go55
-rw-r--r--vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go123
-rw-r--r--vendor/github.com/go-redis/redis/internal/pool/pool_test.go241
-rw-r--r--vendor/github.com/go-redis/redis/internal/proto/proto_test.go13
-rw-r--r--vendor/github.com/go-redis/redis/internal/proto/reader.go334
-rw-r--r--vendor/github.com/go-redis/redis/internal/proto/reader_test.go87
-rw-r--r--vendor/github.com/go-redis/redis/internal/proto/scan.go131
-rw-r--r--vendor/github.com/go-redis/redis/internal/proto/scan_test.go48
-rw-r--r--vendor/github.com/go-redis/redis/internal/proto/write_buffer.go103
-rw-r--r--vendor/github.com/go-redis/redis/internal/proto/write_buffer_test.go63
-rw-r--r--vendor/github.com/go-redis/redis/internal/safe.go11
-rw-r--r--vendor/github.com/go-redis/redis/internal/unsafe.go27
-rw-r--r--vendor/github.com/go-redis/redis/internal/util.go47
26 files changed, 2371 insertions, 0 deletions
diff --git a/vendor/github.com/go-redis/redis/internal/consistenthash/consistenthash.go b/vendor/github.com/go-redis/redis/internal/consistenthash/consistenthash.go
new file mode 100644
index 000000000..a9c56f076
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/consistenthash/consistenthash.go
@@ -0,0 +1,81 @@
+/*
+Copyright 2013 Google Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package consistenthash provides an implementation of a ring hash.
+package consistenthash
+
+import (
+ "hash/crc32"
+ "sort"
+ "strconv"
+)
+
+type Hash func(data []byte) uint32
+
+type Map struct {
+ hash Hash
+ replicas int
+ keys []int // Sorted
+ hashMap map[int]string
+}
+
+func New(replicas int, fn Hash) *Map {
+ m := &Map{
+ replicas: replicas,
+ hash: fn,
+ hashMap: make(map[int]string),
+ }
+ if m.hash == nil {
+ m.hash = crc32.ChecksumIEEE
+ }
+ return m
+}
+
+// Returns true if there are no items available.
+func (m *Map) IsEmpty() bool {
+ return len(m.keys) == 0
+}
+
+// Adds some keys to the hash.
+func (m *Map) Add(keys ...string) {
+ for _, key := range keys {
+ for i := 0; i < m.replicas; i++ {
+ hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
+ m.keys = append(m.keys, hash)
+ m.hashMap[hash] = key
+ }
+ }
+ sort.Ints(m.keys)
+}
+
+// Gets the closest item in the hash to the provided key.
+func (m *Map) Get(key string) string {
+ if m.IsEmpty() {
+ return ""
+ }
+
+ hash := int(m.hash([]byte(key)))
+
+ // Binary search for appropriate replica.
+ idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash })
+
+ // Means we have cycled back to the first replica.
+ if idx == len(m.keys) {
+ idx = 0
+ }
+
+ return m.hashMap[m.keys[idx]]
+}
diff --git a/vendor/github.com/go-redis/redis/internal/consistenthash/consistenthash_test.go b/vendor/github.com/go-redis/redis/internal/consistenthash/consistenthash_test.go
new file mode 100644
index 000000000..1a37fd7ff
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/consistenthash/consistenthash_test.go
@@ -0,0 +1,110 @@
+/*
+Copyright 2013 Google Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consistenthash
+
+import (
+ "fmt"
+ "strconv"
+ "testing"
+)
+
+func TestHashing(t *testing.T) {
+
+ // Override the hash function to return easier to reason about values. Assumes
+ // the keys can be converted to an integer.
+ hash := New(3, func(key []byte) uint32 {
+ i, err := strconv.Atoi(string(key))
+ if err != nil {
+ panic(err)
+ }
+ return uint32(i)
+ })
+
+ // Given the above hash function, this will give replicas with "hashes":
+ // 2, 4, 6, 12, 14, 16, 22, 24, 26
+ hash.Add("6", "4", "2")
+
+ testCases := map[string]string{
+ "2": "2",
+ "11": "2",
+ "23": "4",
+ "27": "2",
+ }
+
+ for k, v := range testCases {
+ if hash.Get(k) != v {
+ t.Errorf("Asking for %s, should have yielded %s", k, v)
+ }
+ }
+
+ // Adds 8, 18, 28
+ hash.Add("8")
+
+ // 27 should now map to 8.
+ testCases["27"] = "8"
+
+ for k, v := range testCases {
+ if hash.Get(k) != v {
+ t.Errorf("Asking for %s, should have yielded %s", k, v)
+ }
+ }
+
+}
+
+func TestConsistency(t *testing.T) {
+ hash1 := New(1, nil)
+ hash2 := New(1, nil)
+
+ hash1.Add("Bill", "Bob", "Bonny")
+ hash2.Add("Bob", "Bonny", "Bill")
+
+ if hash1.Get("Ben") != hash2.Get("Ben") {
+ t.Errorf("Fetching 'Ben' from both hashes should be the same")
+ }
+
+ hash2.Add("Becky", "Ben", "Bobby")
+
+ if hash1.Get("Ben") != hash2.Get("Ben") ||
+ hash1.Get("Bob") != hash2.Get("Bob") ||
+ hash1.Get("Bonny") != hash2.Get("Bonny") {
+ t.Errorf("Direct matches should always return the same entry")
+ }
+
+}
+
+func BenchmarkGet8(b *testing.B) { benchmarkGet(b, 8) }
+func BenchmarkGet32(b *testing.B) { benchmarkGet(b, 32) }
+func BenchmarkGet128(b *testing.B) { benchmarkGet(b, 128) }
+func BenchmarkGet512(b *testing.B) { benchmarkGet(b, 512) }
+
+func benchmarkGet(b *testing.B, shards int) {
+
+ hash := New(50, nil)
+
+ var buckets []string
+ for i := 0; i < shards; i++ {
+ buckets = append(buckets, fmt.Sprintf("shard-%d", i))
+ }
+
+ hash.Add(buckets...)
+
+ b.ResetTimer()
+
+ for i := 0; i < b.N; i++ {
+ hash.Get(buckets[i&(shards-1)])
+ }
+}
diff --git a/vendor/github.com/go-redis/redis/internal/errors.go b/vendor/github.com/go-redis/redis/internal/errors.go
new file mode 100644
index 000000000..c93e00818
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/errors.go
@@ -0,0 +1,75 @@
+package internal
+
+import (
+ "io"
+ "net"
+ "strings"
+)
+
+const Nil = RedisError("redis: nil")
+
+type RedisError string
+
+func (e RedisError) Error() string { return string(e) }
+
+func IsRetryableError(err error) bool {
+ return IsNetworkError(err) || err.Error() == "ERR max number of clients reached"
+}
+
+func IsInternalError(err error) bool {
+ _, ok := err.(RedisError)
+ return ok
+}
+
+func IsNetworkError(err error) bool {
+ if err == io.EOF {
+ return true
+ }
+ _, ok := err.(net.Error)
+ return ok
+}
+
+func IsBadConn(err error, allowTimeout bool) bool {
+ if err == nil {
+ return false
+ }
+ if IsInternalError(err) {
+ return false
+ }
+ if allowTimeout {
+ if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
+ return false
+ }
+ }
+ return true
+}
+
+func IsMovedError(err error) (moved bool, ask bool, addr string) {
+ if !IsInternalError(err) {
+ return
+ }
+
+ s := err.Error()
+ if strings.HasPrefix(s, "MOVED ") {
+ moved = true
+ } else if strings.HasPrefix(s, "ASK ") {
+ ask = true
+ } else {
+ return
+ }
+
+ ind := strings.LastIndex(s, " ")
+ if ind == -1 {
+ return false, false, ""
+ }
+ addr = s[ind+1:]
+ return
+}
+
+func IsLoadingError(err error) bool {
+ return strings.HasPrefix(err.Error(), "LOADING")
+}
+
+func IsExecAbortError(err error) bool {
+ return strings.HasPrefix(err.Error(), "EXECABORT")
+}
diff --git a/vendor/github.com/go-redis/redis/internal/hashtag/hashtag.go b/vendor/github.com/go-redis/redis/internal/hashtag/hashtag.go
new file mode 100644
index 000000000..2866488e5
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/hashtag/hashtag.go
@@ -0,0 +1,73 @@
+package hashtag
+
+import (
+ "math/rand"
+ "strings"
+)
+
+const SlotNumber = 16384
+
+// CRC16 implementation according to CCITT standards.
+// Copyright 2001-2010 Georges Menie (www.menie.org)
+// Copyright 2013 The Go Authors. All rights reserved.
+// http://redis.io/topics/cluster-spec#appendix-a-crc16-reference-implementation-in-ansi-c
+var crc16tab = [256]uint16{
+ 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7,
+ 0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef,
+ 0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6,
+ 0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de,
+ 0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485,
+ 0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d,
+ 0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4,
+ 0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc,
+ 0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823,
+ 0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b,
+ 0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12,
+ 0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a,
+ 0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41,
+ 0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49,
+ 0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70,
+ 0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78,
+ 0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f,
+ 0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067,
+ 0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e,
+ 0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256,
+ 0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d,
+ 0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
+ 0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c,
+ 0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634,
+ 0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab,
+ 0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3,
+ 0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a,
+ 0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92,
+ 0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9,
+ 0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1,
+ 0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8,
+ 0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0,
+}
+
+func Key(key string) string {
+ if s := strings.IndexByte(key, '{'); s > -1 {
+ if e := strings.IndexByte(key[s+1:], '}'); e > 0 {
+ return key[s+1 : s+e+1]
+ }
+ }
+ return key
+}
+
+// hashSlot returns a consistent slot number between 0 and 16383
+// for any given string key.
+func Slot(key string) int {
+ key = Key(key)
+ if key == "" {
+ return rand.Intn(SlotNumber)
+ }
+ return int(crc16sum(key)) % SlotNumber
+}
+
+func crc16sum(key string) (crc uint16) {
+ for i := 0; i < len(key); i++ {
+ crc = (crc << 8) ^ crc16tab[(byte(crc>>8)^key[i])&0x00ff]
+ }
+ return
+}
diff --git a/vendor/github.com/go-redis/redis/internal/hashtag/hashtag_test.go b/vendor/github.com/go-redis/redis/internal/hashtag/hashtag_test.go
new file mode 100644
index 000000000..7f0fedf31
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/hashtag/hashtag_test.go
@@ -0,0 +1,74 @@
+package hashtag
+
+import (
+ "math/rand"
+ "testing"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+func TestGinkgoSuite(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "hashtag")
+}
+
+var _ = Describe("CRC16", func() {
+
+ // http://redis.io/topics/cluster-spec#keys-distribution-model
+ It("should calculate CRC16", func() {
+ tests := []struct {
+ s string
+ n uint16
+ }{
+ {"123456789", 0x31C3},
+ {string([]byte{83, 153, 134, 118, 229, 214, 244, 75, 140, 37, 215, 215}), 21847},
+ }
+
+ for _, test := range tests {
+ Expect(crc16sum(test.s)).To(Equal(test.n), "for %s", test.s)
+ }
+ })
+
+})
+
+var _ = Describe("HashSlot", func() {
+
+ It("should calculate hash slots", func() {
+ tests := []struct {
+ key string
+ slot int
+ }{
+ {"123456789", 12739},
+ {"{}foo", 9500},
+ {"foo{}", 5542},
+ {"foo{}{bar}", 8363},
+ {"", 10503},
+ {"", 5176},
+ {string([]byte{83, 153, 134, 118, 229, 214, 244, 75, 140, 37, 215, 215}), 5463},
+ }
+ // Empty keys receive random slot.
+ rand.Seed(100)
+
+ for _, test := range tests {
+ Expect(Slot(test.key)).To(Equal(test.slot), "for %s", test.key)
+ }
+ })
+
+ It("should extract keys from tags", func() {
+ tests := []struct {
+ one, two string
+ }{
+ {"foo{bar}", "bar"},
+ {"{foo}bar", "foo"},
+ {"{user1000}.following", "{user1000}.followers"},
+ {"foo{{bar}}zap", "{bar"},
+ {"foo{bar}{zap}", "bar"},
+ }
+
+ for _, test := range tests {
+ Expect(Slot(test.one)).To(Equal(Slot(test.two)), "for %s <-> %s", test.one, test.two)
+ }
+ })
+
+})
diff --git a/vendor/github.com/go-redis/redis/internal/internal.go b/vendor/github.com/go-redis/redis/internal/internal.go
new file mode 100644
index 000000000..fb4efa5f0
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/internal.go
@@ -0,0 +1,23 @@
+package internal
+
+import (
+ "math/rand"
+ "time"
+)
+
+const retryBackoff = 8 * time.Millisecond
+
+// Retry backoff with jitter sleep to prevent overloaded conditions during intervals
+// https://www.awsarchitectureblog.com/2015/03/backoff.html
+func RetryBackoff(retry int, maxRetryBackoff time.Duration) time.Duration {
+ if retry < 0 {
+ retry = 0
+ }
+
+ backoff := retryBackoff << uint(retry)
+ if backoff > maxRetryBackoff {
+ backoff = maxRetryBackoff
+ }
+
+ return time.Duration(rand.Int63n(int64(backoff)))
+}
diff --git a/vendor/github.com/go-redis/redis/internal/internal_test.go b/vendor/github.com/go-redis/redis/internal/internal_test.go
new file mode 100644
index 000000000..5c7000e1e
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/internal_test.go
@@ -0,0 +1,17 @@
+package internal
+
+import (
+ "testing"
+ . "github.com/onsi/gomega"
+ "time"
+)
+
+func TestRetryBackoff(t *testing.T) {
+ RegisterTestingT(t)
+
+ for i := -1; i<= 8; i++ {
+ backoff := RetryBackoff(i, 512*time.Millisecond)
+ Expect(backoff >= 0).To(BeTrue())
+ Expect(backoff <= 512*time.Millisecond).To(BeTrue())
+ }
+}
diff --git a/vendor/github.com/go-redis/redis/internal/log.go b/vendor/github.com/go-redis/redis/internal/log.go
new file mode 100644
index 000000000..fd14222ee
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/log.go
@@ -0,0 +1,15 @@
+package internal
+
+import (
+ "fmt"
+ "log"
+)
+
+var Logger *log.Logger
+
+func Logf(s string, args ...interface{}) {
+ if Logger == nil {
+ return
+ }
+ Logger.Output(2, fmt.Sprintf(s, args...))
+}
diff --git a/vendor/github.com/go-redis/redis/internal/once.go b/vendor/github.com/go-redis/redis/internal/once.go
new file mode 100644
index 000000000..64f46272a
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/once.go
@@ -0,0 +1,60 @@
+/*
+Copyright 2014 The Camlistore Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package internal
+
+import (
+ "sync"
+ "sync/atomic"
+)
+
+// A Once will perform a successful action exactly once.
+//
+// Unlike a sync.Once, this Once's func returns an error
+// and is re-armed on failure.
+type Once struct {
+ m sync.Mutex
+ done uint32
+}
+
+// Do calls the function f if and only if Do has not been invoked
+// without error for this instance of Once. In other words, given
+// var once Once
+// if once.Do(f) is called multiple times, only the first call will
+// invoke f, even if f has a different value in each invocation unless
+// f returns an error. A new instance of Once is required for each
+// function to execute.
+//
+// Do is intended for initialization that must be run exactly once. Since f
+// is niladic, it may be necessary to use a function literal to capture the
+// arguments to a function to be invoked by Do:
+// err := config.once.Do(func() error { return config.init(filename) })
+func (o *Once) Do(f func() error) error {
+ if atomic.LoadUint32(&o.done) == 1 {
+ return nil
+ }
+ // Slow-path.
+ o.m.Lock()
+ defer o.m.Unlock()
+ var err error
+ if o.done == 0 {
+ err = f()
+ if err == nil {
+ atomic.StoreUint32(&o.done, 1)
+ }
+ }
+ return err
+}
diff --git a/vendor/github.com/go-redis/redis/internal/pool/bench_test.go b/vendor/github.com/go-redis/redis/internal/pool/bench_test.go
new file mode 100644
index 000000000..e0bb52446
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/pool/bench_test.go
@@ -0,0 +1,80 @@
+package pool_test
+
+import (
+ "testing"
+ "time"
+
+ "github.com/go-redis/redis/internal/pool"
+)
+
+func benchmarkPoolGetPut(b *testing.B, poolSize int) {
+ connPool := pool.NewConnPool(&pool.Options{
+ Dialer: dummyDialer,
+ PoolSize: poolSize,
+ PoolTimeout: time.Second,
+ IdleTimeout: time.Hour,
+ IdleCheckFrequency: time.Hour,
+ })
+
+ b.ResetTimer()
+
+ b.RunParallel(func(pb *testing.PB) {
+ for pb.Next() {
+ cn, _, err := connPool.Get()
+ if err != nil {
+ b.Fatal(err)
+ }
+ if err = connPool.Put(cn); err != nil {
+ b.Fatal(err)
+ }
+ }
+ })
+}
+
+func BenchmarkPoolGetPut10Conns(b *testing.B) {
+ benchmarkPoolGetPut(b, 10)
+}
+
+func BenchmarkPoolGetPut100Conns(b *testing.B) {
+ benchmarkPoolGetPut(b, 100)
+}
+
+func BenchmarkPoolGetPut1000Conns(b *testing.B) {
+ benchmarkPoolGetPut(b, 1000)
+}
+
+func benchmarkPoolGetRemove(b *testing.B, poolSize int) {
+ connPool := pool.NewConnPool(&pool.Options{
+ Dialer: dummyDialer,
+ PoolSize: poolSize,
+ PoolTimeout: time.Second,
+ IdleTimeout: time.Hour,
+ IdleCheckFrequency: time.Hour,
+ })
+
+ b.ResetTimer()
+
+ b.RunParallel(func(pb *testing.PB) {
+ for pb.Next() {
+ cn, _, err := connPool.Get()
+ if err != nil {
+ b.Fatal(err)
+ }
+ if err := connPool.Remove(cn); err != nil {
+ b.Fatal(err)
+ }
+ }
+ })
+}
+
+func BenchmarkPoolGetRemove10Conns(b *testing.B) {
+ benchmarkPoolGetRemove(b, 10)
+}
+
+func BenchmarkPoolGetRemove100Conns(b *testing.B) {
+ benchmarkPoolGetRemove(b, 100)
+}
+
+func BenchmarkPoolGetRemove1000Conns(b *testing.B) {
+ benchmarkPoolGetRemove(b, 1000)
+}
diff --git a/vendor/github.com/go-redis/redis/internal/pool/conn.go b/vendor/github.com/go-redis/redis/internal/pool/conn.go
new file mode 100644
index 000000000..8af51d9de
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/pool/conn.go
@@ -0,0 +1,78 @@
+package pool
+
+import (
+ "net"
+ "sync/atomic"
+ "time"
+
+ "github.com/go-redis/redis/internal/proto"
+)
+
+var noDeadline = time.Time{}
+
+type Conn struct {
+ netConn net.Conn
+
+ Rd *proto.Reader
+ Wb *proto.WriteBuffer
+
+ Inited bool
+ usedAt atomic.Value
+}
+
+func NewConn(netConn net.Conn) *Conn {
+ cn := &Conn{
+ netConn: netConn,
+ Wb: proto.NewWriteBuffer(),
+ }
+ cn.Rd = proto.NewReader(cn.netConn)
+ cn.SetUsedAt(time.Now())
+ return cn
+}
+
+func (cn *Conn) UsedAt() time.Time {
+ return cn.usedAt.Load().(time.Time)
+}
+
+func (cn *Conn) SetUsedAt(tm time.Time) {
+ cn.usedAt.Store(tm)
+}
+
+func (cn *Conn) SetNetConn(netConn net.Conn) {
+ cn.netConn = netConn
+ cn.Rd.Reset(netConn)
+}
+
+func (cn *Conn) IsStale(timeout time.Duration) bool {
+ return timeout > 0 && time.Since(cn.UsedAt()) > timeout
+}
+
+func (cn *Conn) SetReadTimeout(timeout time.Duration) error {
+ now := time.Now()
+ cn.SetUsedAt(now)
+ if timeout > 0 {
+ return cn.netConn.SetReadDeadline(now.Add(timeout))
+ }
+ return cn.netConn.SetReadDeadline(noDeadline)
+}
+
+func (cn *Conn) SetWriteTimeout(timeout time.Duration) error {
+ now := time.Now()
+ cn.SetUsedAt(now)
+ if timeout > 0 {
+ return cn.netConn.SetWriteDeadline(now.Add(timeout))
+ }
+ return cn.netConn.SetWriteDeadline(noDeadline)
+}
+
+func (cn *Conn) Write(b []byte) (int, error) {
+ return cn.netConn.Write(b)
+}
+
+func (cn *Conn) RemoteAddr() net.Addr {
+ return cn.netConn.RemoteAddr()
+}
+
+func (cn *Conn) Close() error {
+ return cn.netConn.Close()
+}
diff --git a/vendor/github.com/go-redis/redis/internal/pool/main_test.go b/vendor/github.com/go-redis/redis/internal/pool/main_test.go
new file mode 100644
index 000000000..43afe3fa9
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/pool/main_test.go
@@ -0,0 +1,35 @@
+package pool_test
+
+import (
+ "net"
+ "sync"
+ "testing"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+func TestGinkgoSuite(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "pool")
+}
+
+func perform(n int, cbs ...func(int)) {
+ var wg sync.WaitGroup
+ for _, cb := range cbs {
+ for i := 0; i < n; i++ {
+ wg.Add(1)
+ go func(cb func(int), i int) {
+ defer GinkgoRecover()
+ defer wg.Done()
+
+ cb(i)
+ }(cb, i)
+ }
+ }
+ wg.Wait()
+}
+
+func dummyDialer() (net.Conn, error) {
+ return &net.TCPConn{}, nil
+}
diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool.go b/vendor/github.com/go-redis/redis/internal/pool/pool.go
new file mode 100644
index 000000000..a4e650847
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/pool/pool.go
@@ -0,0 +1,367 @@
+package pool
+
+import (
+ "errors"
+ "net"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/go-redis/redis/internal"
+)
+
+var ErrClosed = errors.New("redis: client is closed")
+var ErrPoolTimeout = errors.New("redis: connection pool timeout")
+
+var timers = sync.Pool{
+ New: func() interface{} {
+ t := time.NewTimer(time.Hour)
+ t.Stop()
+ return t
+ },
+}
+
+// Stats contains pool state information and accumulated stats.
+type Stats struct {
+ Requests uint32 // number of times a connection was requested by the pool
+ Hits uint32 // number of times free connection was found in the pool
+ Timeouts uint32 // number of times a wait timeout occurred
+
+ TotalConns uint32 // the number of total connections in the pool
+ FreeConns uint32 // the number of free connections in the pool
+}
+
+type Pooler interface {
+ NewConn() (*Conn, error)
+ CloseConn(*Conn) error
+
+ Get() (*Conn, bool, error)
+ Put(*Conn) error
+ Remove(*Conn) error
+
+ Len() int
+ FreeLen() int
+ Stats() *Stats
+
+ Close() error
+}
+
+type Options struct {
+ Dialer func() (net.Conn, error)
+ OnClose func(*Conn) error
+
+ PoolSize int
+ PoolTimeout time.Duration
+ IdleTimeout time.Duration
+ IdleCheckFrequency time.Duration
+}
+
+type ConnPool struct {
+ opt *Options
+
+ dialErrorsNum uint32 // atomic
+ _lastDialError atomic.Value
+
+ queue chan struct{}
+
+ connsMu sync.Mutex
+ conns []*Conn
+
+ freeConnsMu sync.Mutex
+ freeConns []*Conn
+
+ stats Stats
+
+ _closed uint32 // atomic
+}
+
+var _ Pooler = (*ConnPool)(nil)
+
+func NewConnPool(opt *Options) *ConnPool {
+ p := &ConnPool{
+ opt: opt,
+
+ queue: make(chan struct{}, opt.PoolSize),
+ conns: make([]*Conn, 0, opt.PoolSize),
+ freeConns: make([]*Conn, 0, opt.PoolSize),
+ }
+ if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
+ go p.reaper(opt.IdleCheckFrequency)
+ }
+ return p
+}
+
+func (p *ConnPool) NewConn() (*Conn, error) {
+ if p.closed() {
+ return nil, ErrClosed
+ }
+
+ if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) {
+ return nil, p.lastDialError()
+ }
+
+ netConn, err := p.opt.Dialer()
+ if err != nil {
+ p.setLastDialError(err)
+ if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) {
+ go p.tryDial()
+ }
+ return nil, err
+ }
+
+ cn := NewConn(netConn)
+ p.connsMu.Lock()
+ p.conns = append(p.conns, cn)
+ p.connsMu.Unlock()
+
+ return cn, nil
+}
+
+func (p *ConnPool) tryDial() {
+ for {
+ conn, err := p.opt.Dialer()
+ if err != nil {
+ p.setLastDialError(err)
+ time.Sleep(time.Second)
+ continue
+ }
+
+ atomic.StoreUint32(&p.dialErrorsNum, 0)
+ _ = conn.Close()
+ return
+ }
+}
+
+func (p *ConnPool) setLastDialError(err error) {
+ p._lastDialError.Store(err)
+}
+
+func (p *ConnPool) lastDialError() error {
+ return p._lastDialError.Load().(error)
+}
+
+// Get returns existed connection from the pool or creates a new one.
+func (p *ConnPool) Get() (*Conn, bool, error) {
+ if p.closed() {
+ return nil, false, ErrClosed
+ }
+
+ atomic.AddUint32(&p.stats.Requests, 1)
+
+ select {
+ case p.queue <- struct{}{}:
+ default:
+ timer := timers.Get().(*time.Timer)
+ timer.Reset(p.opt.PoolTimeout)
+
+ select {
+ case p.queue <- struct{}{}:
+ if !timer.Stop() {
+ <-timer.C
+ }
+ timers.Put(timer)
+ case <-timer.C:
+ timers.Put(timer)
+ atomic.AddUint32(&p.stats.Timeouts, 1)
+ return nil, false, ErrPoolTimeout
+ }
+ }
+
+ for {
+ p.freeConnsMu.Lock()
+ cn := p.popFree()
+ p.freeConnsMu.Unlock()
+
+ if cn == nil {
+ break
+ }
+
+ if cn.IsStale(p.opt.IdleTimeout) {
+ p.CloseConn(cn)
+ continue
+ }
+
+ atomic.AddUint32(&p.stats.Hits, 1)
+ return cn, false, nil
+ }
+
+ newcn, err := p.NewConn()
+ if err != nil {
+ <-p.queue
+ return nil, false, err
+ }
+
+ return newcn, true, nil
+}
+
+func (p *ConnPool) popFree() *Conn {
+ if len(p.freeConns) == 0 {
+ return nil
+ }
+
+ idx := len(p.freeConns) - 1
+ cn := p.freeConns[idx]
+ p.freeConns = p.freeConns[:idx]
+ return cn
+}
+
+func (p *ConnPool) Put(cn *Conn) error {
+ if data := cn.Rd.PeekBuffered(); data != nil {
+ internal.Logf("connection has unread data: %q", data)
+ return p.Remove(cn)
+ }
+ p.freeConnsMu.Lock()
+ p.freeConns = append(p.freeConns, cn)
+ p.freeConnsMu.Unlock()
+ <-p.queue
+ return nil
+}
+
+func (p *ConnPool) Remove(cn *Conn) error {
+ _ = p.CloseConn(cn)
+ <-p.queue
+ return nil
+}
+
+func (p *ConnPool) CloseConn(cn *Conn) error {
+ p.connsMu.Lock()
+ for i, c := range p.conns {
+ if c == cn {
+ p.conns = append(p.conns[:i], p.conns[i+1:]...)
+ break
+ }
+ }
+ p.connsMu.Unlock()
+
+ return p.closeConn(cn)
+}
+
+func (p *ConnPool) closeConn(cn *Conn) error {
+ if p.opt.OnClose != nil {
+ _ = p.opt.OnClose(cn)
+ }
+ return cn.Close()
+}
+
+// Len returns total number of connections.
+func (p *ConnPool) Len() int {
+ p.connsMu.Lock()
+ l := len(p.conns)
+ p.connsMu.Unlock()
+ return l
+}
+
+// FreeLen returns number of free connections.
+func (p *ConnPool) FreeLen() int {
+ p.freeConnsMu.Lock()
+ l := len(p.freeConns)
+ p.freeConnsMu.Unlock()
+ return l
+}
+
+func (p *ConnPool) Stats() *Stats {
+ return &Stats{
+ Requests: atomic.LoadUint32(&p.stats.Requests),
+ Hits: atomic.LoadUint32(&p.stats.Hits),
+ Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
+ TotalConns: uint32(p.Len()),
+ FreeConns: uint32(p.FreeLen()),
+ }
+}
+
+func (p *ConnPool) closed() bool {
+ return atomic.LoadUint32(&p._closed) == 1
+}
+
+func (p *ConnPool) Filter(fn func(*Conn) bool) error {
+ var firstErr error
+ p.connsMu.Lock()
+ for _, cn := range p.conns {
+ if fn(cn) {
+ if err := p.closeConn(cn); err != nil && firstErr == nil {
+ firstErr = err
+ }
+ }
+ }
+ p.connsMu.Unlock()
+ return firstErr
+}
+
+func (p *ConnPool) Close() error {
+ if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) {
+ return ErrClosed
+ }
+
+ var firstErr error
+ p.connsMu.Lock()
+ for _, cn := range p.conns {
+ if err := p.closeConn(cn); err != nil && firstErr == nil {
+ firstErr = err
+ }
+ }
+ p.conns = nil
+ p.connsMu.Unlock()
+
+ p.freeConnsMu.Lock()
+ p.freeConns = nil
+ p.freeConnsMu.Unlock()
+
+ return firstErr
+}
+
+func (p *ConnPool) reapStaleConn() bool {
+ if len(p.freeConns) == 0 {
+ return false
+ }
+
+ cn := p.freeConns[0]
+ if !cn.IsStale(p.opt.IdleTimeout) {
+ return false
+ }
+
+ p.CloseConn(cn)
+ p.freeConns = append(p.freeConns[:0], p.freeConns[1:]...)
+
+ return true
+}
+
+func (p *ConnPool) ReapStaleConns() (int, error) {
+ var n int
+ for {
+ p.queue <- struct{}{}
+ p.freeConnsMu.Lock()
+
+ reaped := p.reapStaleConn()
+
+ p.freeConnsMu.Unlock()
+ <-p.queue
+
+ if reaped {
+ n++
+ } else {
+ break
+ }
+ }
+ return n, nil
+}
+
+func (p *ConnPool) reaper(frequency time.Duration) {
+ ticker := time.NewTicker(frequency)
+ defer ticker.Stop()
+
+ for range ticker.C {
+ if p.closed() {
+ break
+ }
+ n, err := p.ReapStaleConns()
+ if err != nil {
+ internal.Logf("ReapStaleConns failed: %s", err)
+ continue
+ }
+ s := p.Stats()
+ internal.Logf(
+ "reaper: removed %d stale conns (TotalConns=%d FreeConns=%d Requests=%d Hits=%d Timeouts=%d)",
+ n, s.TotalConns, s.FreeConns, s.Requests, s.Hits, s.Timeouts,
+ )
+ }
+}
diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool_single.go b/vendor/github.com/go-redis/redis/internal/pool/pool_single.go
new file mode 100644
index 000000000..ff91279b3
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/pool/pool_single.go
@@ -0,0 +1,55 @@
+package pool
+
+type SingleConnPool struct {
+ cn *Conn
+}
+
+var _ Pooler = (*SingleConnPool)(nil)
+
+func NewSingleConnPool(cn *Conn) *SingleConnPool {
+ return &SingleConnPool{
+ cn: cn,
+ }
+}
+
+func (p *SingleConnPool) NewConn() (*Conn, error) {
+ panic("not implemented")
+}
+
+func (p *SingleConnPool) CloseConn(*Conn) error {
+ panic("not implemented")
+}
+
+func (p *SingleConnPool) Get() (*Conn, bool, error) {
+ return p.cn, false, nil
+}
+
+func (p *SingleConnPool) Put(cn *Conn) error {
+ if p.cn != cn {
+ panic("p.cn != cn")
+ }
+ return nil
+}
+
+func (p *SingleConnPool) Remove(cn *Conn) error {
+ if p.cn != cn {
+ panic("p.cn != cn")
+ }
+ return nil
+}
+
+func (p *SingleConnPool) Len() int {
+ return 1
+}
+
+func (p *SingleConnPool) FreeLen() int {
+ return 0
+}
+
+func (p *SingleConnPool) Stats() *Stats {
+ return nil
+}
+
+func (p *SingleConnPool) Close() error {
+ return nil
+}
diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go b/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go
new file mode 100644
index 000000000..17f163858
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go
@@ -0,0 +1,123 @@
+package pool
+
+import "sync"
+
+type StickyConnPool struct {
+ pool *ConnPool
+ reusable bool
+
+ cn *Conn
+ closed bool
+ mu sync.Mutex
+}
+
+var _ Pooler = (*StickyConnPool)(nil)
+
+func NewStickyConnPool(pool *ConnPool, reusable bool) *StickyConnPool {
+ return &StickyConnPool{
+ pool: pool,
+ reusable: reusable,
+ }
+}
+
+func (p *StickyConnPool) NewConn() (*Conn, error) {
+ panic("not implemented")
+}
+
+func (p *StickyConnPool) CloseConn(*Conn) error {
+ panic("not implemented")
+}
+
+func (p *StickyConnPool) Get() (*Conn, bool, error) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ if p.closed {
+ return nil, false, ErrClosed
+ }
+ if p.cn != nil {
+ return p.cn, false, nil
+ }
+
+ cn, _, err := p.pool.Get()
+ if err != nil {
+ return nil, false, err
+ }
+ p.cn = cn
+ return cn, true, nil
+}
+
+func (p *StickyConnPool) putUpstream() (err error) {
+ err = p.pool.Put(p.cn)
+ p.cn = nil
+ return err
+}
+
+func (p *StickyConnPool) Put(cn *Conn) error {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ if p.closed {
+ return ErrClosed
+ }
+ return nil
+}
+
+func (p *StickyConnPool) removeUpstream() error {
+ err := p.pool.Remove(p.cn)
+ p.cn = nil
+ return err
+}
+
+func (p *StickyConnPool) Remove(cn *Conn) error {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ if p.closed {
+ return nil
+ }
+ return p.removeUpstream()
+}
+
+func (p *StickyConnPool) Len() int {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ if p.cn == nil {
+ return 0
+ }
+ return 1
+}
+
+func (p *StickyConnPool) FreeLen() int {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ if p.cn == nil {
+ return 1
+ }
+ return 0
+}
+
+func (p *StickyConnPool) Stats() *Stats {
+ return nil
+}
+
+func (p *StickyConnPool) Close() error {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ if p.closed {
+ return ErrClosed
+ }
+ p.closed = true
+ var err error
+ if p.cn != nil {
+ if p.reusable {
+ err = p.putUpstream()
+ } else {
+ err = p.removeUpstream()
+ }
+ }
+ return err
+}
diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool_test.go b/vendor/github.com/go-redis/redis/internal/pool/pool_test.go
new file mode 100644
index 000000000..68c9a1bef
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/pool/pool_test.go
@@ -0,0 +1,241 @@
+package pool_test
+
+import (
+ "testing"
+ "time"
+
+ "github.com/go-redis/redis/internal/pool"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+var _ = Describe("ConnPool", func() {
+ var connPool *pool.ConnPool
+
+ BeforeEach(func() {
+ connPool = pool.NewConnPool(&pool.Options{
+ Dialer: dummyDialer,
+ PoolSize: 10,
+ PoolTimeout: time.Hour,
+ IdleTimeout: time.Millisecond,
+ IdleCheckFrequency: time.Millisecond,
+ })
+ })
+
+ AfterEach(func() {
+ connPool.Close()
+ })
+
+ It("should unblock client when conn is removed", func() {
+ // Reserve one connection.
+ cn, _, err := connPool.Get()
+ Expect(err).NotTo(HaveOccurred())
+
+ // Reserve all other connections.
+ var cns []*pool.Conn
+ for i := 0; i < 9; i++ {
+ cn, _, err := connPool.Get()
+ Expect(err).NotTo(HaveOccurred())
+ cns = append(cns, cn)
+ }
+
+ started := make(chan bool, 1)
+ done := make(chan bool, 1)
+ go func() {
+ defer GinkgoRecover()
+
+ started <- true
+ _, _, err := connPool.Get()
+ Expect(err).NotTo(HaveOccurred())
+ done <- true
+
+ err = connPool.Put(cn)
+ Expect(err).NotTo(HaveOccurred())
+ }()
+ <-started
+
+ // Check that Get is blocked.
+ select {
+ case <-done:
+ Fail("Get is not blocked")
+ default:
+ // ok
+ }
+
+ err = connPool.Remove(cn)
+ Expect(err).NotTo(HaveOccurred())
+
+ // Check that Ping is unblocked.
+ select {
+ case <-done:
+ // ok
+ case <-time.After(time.Second):
+ Fail("Get is not unblocked")
+ }
+
+ for _, cn := range cns {
+ err = connPool.Put(cn)
+ Expect(err).NotTo(HaveOccurred())
+ }
+ })
+})
+
+var _ = Describe("conns reaper", func() {
+ const idleTimeout = time.Minute
+
+ var connPool *pool.ConnPool
+ var conns, idleConns, closedConns []*pool.Conn
+
+ BeforeEach(func() {
+ conns = nil
+ closedConns = nil
+
+ connPool = pool.NewConnPool(&pool.Options{
+ Dialer: dummyDialer,
+ PoolSize: 10,
+ PoolTimeout: time.Second,
+ IdleTimeout: idleTimeout,
+ IdleCheckFrequency: time.Hour,
+
+ OnClose: func(cn *pool.Conn) error {
+ closedConns = append(closedConns, cn)
+ return nil
+ },
+ })
+
+ // add stale connections
+ idleConns = nil
+ for i := 0; i < 3; i++ {
+ cn, _, err := connPool.Get()
+ Expect(err).NotTo(HaveOccurred())
+ cn.SetUsedAt(time.Now().Add(-2 * idleTimeout))
+ conns = append(conns, cn)
+ idleConns = append(idleConns, cn)
+ }
+
+ // add fresh connections
+ for i := 0; i < 3; i++ {
+ cn, _, err := connPool.Get()
+ Expect(err).NotTo(HaveOccurred())
+ conns = append(conns, cn)
+ }
+
+ for _, cn := range conns {
+ Expect(connPool.Put(cn)).NotTo(HaveOccurred())
+ }
+
+ Expect(connPool.Len()).To(Equal(6))
+ Expect(connPool.FreeLen()).To(Equal(6))
+
+ n, err := connPool.ReapStaleConns()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(n).To(Equal(3))
+ })
+
+ AfterEach(func() {
+ _ = connPool.Close()
+ Expect(connPool.Len()).To(Equal(0))
+ Expect(connPool.FreeLen()).To(Equal(0))
+ Expect(len(closedConns)).To(Equal(len(conns)))
+ Expect(closedConns).To(ConsistOf(conns))
+ })
+
+ It("reaps stale connections", func() {
+ Expect(connPool.Len()).To(Equal(3))
+ Expect(connPool.FreeLen()).To(Equal(3))
+ })
+
+ It("does not reap fresh connections", func() {
+ n, err := connPool.ReapStaleConns()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(n).To(Equal(0))
+ })
+
+ It("stale connections are closed", func() {
+ Expect(len(closedConns)).To(Equal(len(idleConns)))
+ Expect(closedConns).To(ConsistOf(idleConns))
+ })
+
+ It("pool is functional", func() {
+ for j := 0; j < 3; j++ {
+ var freeCns []*pool.Conn
+ for i := 0; i < 3; i++ {
+ cn, _, err := connPool.Get()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(cn).NotTo(BeNil())
+ freeCns = append(freeCns, cn)
+ }
+
+ Expect(connPool.Len()).To(Equal(3))
+ Expect(connPool.FreeLen()).To(Equal(0))
+
+ cn, _, err := connPool.Get()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(cn).NotTo(BeNil())
+ conns = append(conns, cn)
+
+ Expect(connPool.Len()).To(Equal(4))
+ Expect(connPool.FreeLen()).To(Equal(0))
+
+ err = connPool.Remove(cn)
+ Expect(err).NotTo(HaveOccurred())
+
+ Expect(connPool.Len()).To(Equal(3))
+ Expect(connPool.FreeLen()).To(Equal(0))
+
+ for _, cn := range freeCns {
+ err := connPool.Put(cn)
+ Expect(err).NotTo(HaveOccurred())
+ }
+
+ Expect(connPool.Len()).To(Equal(3))
+ Expect(connPool.FreeLen()).To(Equal(3))
+ }
+ })
+})
+
+var _ = Describe("race", func() {
+ var connPool *pool.ConnPool
+ var C, N int
+
+ BeforeEach(func() {
+ C, N = 10, 1000
+ if testing.Short() {
+ C = 4
+ N = 100
+ }
+ })
+
+ AfterEach(func() {
+ connPool.Close()
+ })
+
+ It("does not happen on Get, Put, and Remove", func() {
+ connPool = pool.NewConnPool(&pool.Options{
+ Dialer: dummyDialer,
+ PoolSize: 10,
+ PoolTimeout: time.Minute,
+ IdleTimeout: time.Millisecond,
+ IdleCheckFrequency: time.Millisecond,
+ })
+
+ perform(C, func(id int) {
+ for i := 0; i < N; i++ {
+ cn, _, err := connPool.Get()
+ Expect(err).NotTo(HaveOccurred())
+ if err == nil {
+ Expect(connPool.Put(cn)).NotTo(HaveOccurred())
+ }
+ }
+ }, func(id int) {
+ for i := 0; i < N; i++ {
+ cn, _, err := connPool.Get()
+ Expect(err).NotTo(HaveOccurred())
+ if err == nil {
+ Expect(connPool.Remove(cn)).NotTo(HaveOccurred())
+ }
+ }
+ })
+ })
+})
diff --git a/vendor/github.com/go-redis/redis/internal/proto/proto_test.go b/vendor/github.com/go-redis/redis/internal/proto/proto_test.go
new file mode 100644
index 000000000..c9a820eb1
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/proto/proto_test.go
@@ -0,0 +1,13 @@
+package proto_test
+
+import (
+ "testing"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+func TestGinkgoSuite(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "proto")
+}
diff --git a/vendor/github.com/go-redis/redis/internal/proto/reader.go b/vendor/github.com/go-redis/redis/internal/proto/reader.go
new file mode 100644
index 000000000..2159cf639
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/proto/reader.go
@@ -0,0 +1,334 @@
+package proto
+
+import (
+ "bufio"
+ "fmt"
+ "io"
+ "strconv"
+
+ "github.com/go-redis/redis/internal"
+)
+
+const bytesAllocLimit = 1024 * 1024 // 1mb
+
+const (
+ ErrorReply = '-'
+ StatusReply = '+'
+ IntReply = ':'
+ StringReply = '$'
+ ArrayReply = '*'
+)
+
+type MultiBulkParse func(*Reader, int64) (interface{}, error)
+
+type Reader struct {
+ src *bufio.Reader
+ buf []byte
+}
+
+func NewReader(rd io.Reader) *Reader {
+ return &Reader{
+ src: bufio.NewReader(rd),
+ buf: make([]byte, 4096),
+ }
+}
+
+func (r *Reader) Reset(rd io.Reader) {
+ r.src.Reset(rd)
+}
+
+func (p *Reader) PeekBuffered() []byte {
+ if n := p.src.Buffered(); n != 0 {
+ b, _ := p.src.Peek(n)
+ return b
+ }
+ return nil
+}
+
+func (p *Reader) ReadN(n int) ([]byte, error) {
+ b, err := readN(p.src, p.buf, n)
+ if err != nil {
+ return nil, err
+ }
+ p.buf = b
+ return b, nil
+}
+
+func (p *Reader) ReadLine() ([]byte, error) {
+ line, isPrefix, err := p.src.ReadLine()
+ if err != nil {
+ return nil, err
+ }
+ if isPrefix {
+ return nil, bufio.ErrBufferFull
+ }
+ if len(line) == 0 {
+ return nil, internal.RedisError("redis: reply is empty")
+ }
+ if isNilReply(line) {
+ return nil, internal.Nil
+ }
+ return line, nil
+}
+
+func (p *Reader) ReadReply(m MultiBulkParse) (interface{}, error) {
+ line, err := p.ReadLine()
+ if err != nil {
+ return nil, err
+ }
+
+ switch line[0] {
+ case ErrorReply:
+ return nil, ParseErrorReply(line)
+ case StatusReply:
+ return parseStatusValue(line), nil
+ case IntReply:
+ return parseInt(line[1:], 10, 64)
+ case StringReply:
+ return p.readTmpBytesValue(line)
+ case ArrayReply:
+ n, err := parseArrayLen(line)
+ if err != nil {
+ return nil, err
+ }
+ return m(p, n)
+ }
+ return nil, fmt.Errorf("redis: can't parse %.100q", line)
+}
+
+func (p *Reader) ReadIntReply() (int64, error) {
+ line, err := p.ReadLine()
+ if err != nil {
+ return 0, err
+ }
+ switch line[0] {
+ case ErrorReply:
+ return 0, ParseErrorReply(line)
+ case IntReply:
+ return parseInt(line[1:], 10, 64)
+ default:
+ return 0, fmt.Errorf("redis: can't parse int reply: %.100q", line)
+ }
+}
+
+func (p *Reader) ReadTmpBytesReply() ([]byte, error) {
+ line, err := p.ReadLine()
+ if err != nil {
+ return nil, err
+ }
+ switch line[0] {
+ case ErrorReply:
+ return nil, ParseErrorReply(line)
+ case StringReply:
+ return p.readTmpBytesValue(line)
+ case StatusReply:
+ return parseStatusValue(line), nil
+ default:
+ return nil, fmt.Errorf("redis: can't parse string reply: %.100q", line)
+ }
+}
+
+func (r *Reader) ReadBytesReply() ([]byte, error) {
+ b, err := r.ReadTmpBytesReply()
+ if err != nil {
+ return nil, err
+ }
+ cp := make([]byte, len(b))
+ copy(cp, b)
+ return cp, nil
+}
+
+func (p *Reader) ReadStringReply() (string, error) {
+ b, err := p.ReadTmpBytesReply()
+ if err != nil {
+ return "", err
+ }
+ return string(b), nil
+}
+
+func (p *Reader) ReadFloatReply() (float64, error) {
+ b, err := p.ReadTmpBytesReply()
+ if err != nil {
+ return 0, err
+ }
+ return parseFloat(b, 64)
+}
+
+func (p *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) {
+ line, err := p.ReadLine()
+ if err != nil {
+ return nil, err
+ }
+ switch line[0] {
+ case ErrorReply:
+ return nil, ParseErrorReply(line)
+ case ArrayReply:
+ n, err := parseArrayLen(line)
+ if err != nil {
+ return nil, err
+ }
+ return m(p, n)
+ default:
+ return nil, fmt.Errorf("redis: can't parse array reply: %.100q", line)
+ }
+}
+
+func (p *Reader) ReadArrayLen() (int64, error) {
+ line, err := p.ReadLine()
+ if err != nil {
+ return 0, err
+ }
+ switch line[0] {
+ case ErrorReply:
+ return 0, ParseErrorReply(line)
+ case ArrayReply:
+ return parseArrayLen(line)
+ default:
+ return 0, fmt.Errorf("redis: can't parse array reply: %.100q", line)
+ }
+}
+
+func (p *Reader) ReadScanReply() ([]string, uint64, error) {
+ n, err := p.ReadArrayLen()
+ if err != nil {
+ return nil, 0, err
+ }
+ if n != 2 {
+ return nil, 0, fmt.Errorf("redis: got %d elements in scan reply, expected 2", n)
+ }
+
+ cursor, err := p.ReadUint()
+ if err != nil {
+ return nil, 0, err
+ }
+
+ n, err = p.ReadArrayLen()
+ if err != nil {
+ return nil, 0, err
+ }
+
+ keys := make([]string, n)
+ for i := int64(0); i < n; i++ {
+ key, err := p.ReadStringReply()
+ if err != nil {
+ return nil, 0, err
+ }
+ keys[i] = key
+ }
+
+ return keys, cursor, err
+}
+
+func (p *Reader) readTmpBytesValue(line []byte) ([]byte, error) {
+ if isNilReply(line) {
+ return nil, internal.Nil
+ }
+
+ replyLen, err := strconv.Atoi(string(line[1:]))
+ if err != nil {
+ return nil, err
+ }
+
+ b, err := p.ReadN(replyLen + 2)
+ if err != nil {
+ return nil, err
+ }
+ return b[:replyLen], nil
+}
+
+func (r *Reader) ReadInt() (int64, error) {
+ b, err := r.ReadTmpBytesReply()
+ if err != nil {
+ return 0, err
+ }
+ return parseInt(b, 10, 64)
+}
+
+func (r *Reader) ReadUint() (uint64, error) {
+ b, err := r.ReadTmpBytesReply()
+ if err != nil {
+ return 0, err
+ }
+ return parseUint(b, 10, 64)
+}
+
+// --------------------------------------------------------------------
+
+func readN(r io.Reader, b []byte, n int) ([]byte, error) {
+ if n == 0 && b == nil {
+ return make([]byte, 0), nil
+ }
+
+ if cap(b) >= n {
+ b = b[:n]
+ _, err := io.ReadFull(r, b)
+ return b, err
+ }
+ b = b[:cap(b)]
+
+ pos := 0
+ for pos < n {
+ diff := n - len(b)
+ if diff > bytesAllocLimit {
+ diff = bytesAllocLimit
+ }
+ b = append(b, make([]byte, diff)...)
+
+ nn, err := io.ReadFull(r, b[pos:])
+ if err != nil {
+ return nil, err
+ }
+ pos += nn
+ }
+
+ return b, nil
+}
+
+func formatInt(n int64) string {
+ return strconv.FormatInt(n, 10)
+}
+
+func formatUint(u uint64) string {
+ return strconv.FormatUint(u, 10)
+}
+
+func formatFloat(f float64) string {
+ return strconv.FormatFloat(f, 'f', -1, 64)
+}
+
+func isNilReply(b []byte) bool {
+ return len(b) == 3 &&
+ (b[0] == StringReply || b[0] == ArrayReply) &&
+ b[1] == '-' && b[2] == '1'
+}
+
+func ParseErrorReply(line []byte) error {
+ return internal.RedisError(string(line[1:]))
+}
+
+func parseStatusValue(line []byte) []byte {
+ return line[1:]
+}
+
+func parseArrayLen(line []byte) (int64, error) {
+ if isNilReply(line) {
+ return 0, internal.Nil
+ }
+ return parseInt(line[1:], 10, 64)
+}
+
+func atoi(b []byte) (int, error) {
+ return strconv.Atoi(internal.BytesToString(b))
+}
+
+func parseInt(b []byte, base int, bitSize int) (int64, error) {
+ return strconv.ParseInt(internal.BytesToString(b), base, bitSize)
+}
+
+func parseUint(b []byte, base int, bitSize int) (uint64, error) {
+ return strconv.ParseUint(internal.BytesToString(b), base, bitSize)
+}
+
+func parseFloat(b []byte, bitSize int) (float64, error) {
+ return strconv.ParseFloat(internal.BytesToString(b), bitSize)
+}
diff --git a/vendor/github.com/go-redis/redis/internal/proto/reader_test.go b/vendor/github.com/go-redis/redis/internal/proto/reader_test.go
new file mode 100644
index 000000000..8d2d71be9
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/proto/reader_test.go
@@ -0,0 +1,87 @@
+package proto_test
+
+import (
+ "bytes"
+ "strings"
+ "testing"
+
+ "github.com/go-redis/redis/internal/proto"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+var _ = Describe("Reader", func() {
+
+ It("should read n bytes", func() {
+ data, err := proto.NewReader(strings.NewReader("ABCDEFGHIJKLMNO")).ReadN(10)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(len(data)).To(Equal(10))
+ Expect(string(data)).To(Equal("ABCDEFGHIJ"))
+
+ data, err = proto.NewReader(strings.NewReader(strings.Repeat("x", 8192))).ReadN(6000)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(len(data)).To(Equal(6000))
+ })
+
+ It("should read lines", func() {
+ p := proto.NewReader(strings.NewReader("$5\r\nhello\r\n"))
+
+ data, err := p.ReadLine()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(string(data)).To(Equal("$5"))
+
+ data, err = p.ReadLine()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(string(data)).To(Equal("hello"))
+ })
+
+})
+
+func BenchmarkReader_ParseReply_Status(b *testing.B) {
+ benchmarkParseReply(b, "+OK\r\n", nil, false)
+}
+
+func BenchmarkReader_ParseReply_Int(b *testing.B) {
+ benchmarkParseReply(b, ":1\r\n", nil, false)
+}
+
+func BenchmarkReader_ParseReply_Error(b *testing.B) {
+ benchmarkParseReply(b, "-Error message\r\n", nil, true)
+}
+
+func BenchmarkReader_ParseReply_String(b *testing.B) {
+ benchmarkParseReply(b, "$5\r\nhello\r\n", nil, false)
+}
+
+func BenchmarkReader_ParseReply_Slice(b *testing.B) {
+ benchmarkParseReply(b, "*2\r\n$5\r\nhello\r\n$5\r\nworld\r\n", multiBulkParse, false)
+}
+
+func benchmarkParseReply(b *testing.B, reply string, m proto.MultiBulkParse, wanterr bool) {
+ buf := new(bytes.Buffer)
+ for i := 0; i < b.N; i++ {
+ buf.WriteString(reply)
+ }
+ p := proto.NewReader(buf)
+ b.ResetTimer()
+
+ for i := 0; i < b.N; i++ {
+ _, err := p.ReadReply(m)
+ if !wanterr && err != nil {
+ b.Fatal(err)
+ }
+ }
+}
+
+func multiBulkParse(p *proto.Reader, n int64) (interface{}, error) {
+ vv := make([]interface{}, 0, n)
+ for i := int64(0); i < n; i++ {
+ v, err := p.ReadReply(multiBulkParse)
+ if err != nil {
+ return nil, err
+ }
+ vv = append(vv, v)
+ }
+ return vv, nil
+}
diff --git a/vendor/github.com/go-redis/redis/internal/proto/scan.go b/vendor/github.com/go-redis/redis/internal/proto/scan.go
new file mode 100644
index 000000000..3ab40b94f
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/proto/scan.go
@@ -0,0 +1,131 @@
+package proto
+
+import (
+ "encoding"
+ "fmt"
+ "reflect"
+
+ "github.com/go-redis/redis/internal"
+)
+
+func Scan(b []byte, v interface{}) error {
+ switch v := v.(type) {
+ case nil:
+ return internal.RedisError("redis: Scan(nil)")
+ case *string:
+ *v = internal.BytesToString(b)
+ return nil
+ case *[]byte:
+ *v = b
+ return nil
+ case *int:
+ var err error
+ *v, err = atoi(b)
+ return err
+ case *int8:
+ n, err := parseInt(b, 10, 8)
+ if err != nil {
+ return err
+ }
+ *v = int8(n)
+ return nil
+ case *int16:
+ n, err := parseInt(b, 10, 16)
+ if err != nil {
+ return err
+ }
+ *v = int16(n)
+ return nil
+ case *int32:
+ n, err := parseInt(b, 10, 32)
+ if err != nil {
+ return err
+ }
+ *v = int32(n)
+ return nil
+ case *int64:
+ n, err := parseInt(b, 10, 64)
+ if err != nil {
+ return err
+ }
+ *v = n
+ return nil
+ case *uint:
+ n, err := parseUint(b, 10, 64)
+ if err != nil {
+ return err
+ }
+ *v = uint(n)
+ return nil
+ case *uint8:
+ n, err := parseUint(b, 10, 8)
+ if err != nil {
+ return err
+ }
+ *v = uint8(n)
+ return nil
+ case *uint16:
+ n, err := parseUint(b, 10, 16)
+ if err != nil {
+ return err
+ }
+ *v = uint16(n)
+ return nil
+ case *uint32:
+ n, err := parseUint(b, 10, 32)
+ if err != nil {
+ return err
+ }
+ *v = uint32(n)
+ return nil
+ case *uint64:
+ n, err := parseUint(b, 10, 64)
+ if err != nil {
+ return err
+ }
+ *v = n
+ return nil
+ case *float32:
+ n, err := parseFloat(b, 32)
+ if err != nil {
+ return err
+ }
+ *v = float32(n)
+ return err
+ case *float64:
+ var err error
+ *v, err = parseFloat(b, 64)
+ return err
+ case *bool:
+ *v = len(b) == 1 && b[0] == '1'
+ return nil
+ case encoding.BinaryUnmarshaler:
+ return v.UnmarshalBinary(b)
+ default:
+ return fmt.Errorf(
+ "redis: can't unmarshal %T (consider implementing BinaryUnmarshaler)", v)
+ }
+}
+
+func ScanSlice(data []string, slice interface{}) error {
+ v := reflect.ValueOf(slice)
+ if !v.IsValid() {
+ return fmt.Errorf("redis: ScanSlice(nil)")
+ }
+ if v.Kind() != reflect.Ptr {
+ return fmt.Errorf("redis: ScanSlice(non-pointer %T)", slice)
+ }
+ v = v.Elem()
+ if v.Kind() != reflect.Slice {
+ return fmt.Errorf("redis: ScanSlice(non-slice %T)", slice)
+ }
+
+ for i, s := range data {
+ elem := internal.SliceNextElem(v)
+ if err := Scan(internal.StringToBytes(s), elem.Addr().Interface()); err != nil {
+ return fmt.Errorf("redis: ScanSlice(index=%d value=%q) failed: %s", i, s, err)
+ }
+ }
+
+ return nil
+}
diff --git a/vendor/github.com/go-redis/redis/internal/proto/scan_test.go b/vendor/github.com/go-redis/redis/internal/proto/scan_test.go
new file mode 100644
index 000000000..fadcd0561
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/proto/scan_test.go
@@ -0,0 +1,48 @@
+package proto
+
+import (
+ "encoding/json"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+type testScanSliceStruct struct {
+ ID int
+ Name string
+}
+
+func (s *testScanSliceStruct) MarshalBinary() ([]byte, error) {
+ return json.Marshal(s)
+}
+
+func (s *testScanSliceStruct) UnmarshalBinary(b []byte) error {
+ return json.Unmarshal(b, s)
+}
+
+var _ = Describe("ScanSlice", func() {
+ data := []string{
+ `{"ID":-1,"Name":"Back Yu"}`,
+ `{"ID":1,"Name":"szyhf"}`,
+ }
+
+ It("[]testScanSliceStruct", func() {
+ var slice []testScanSliceStruct
+ err := ScanSlice(data, &slice)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(slice).To(Equal([]testScanSliceStruct{
+ {-1, "Back Yu"},
+ {1, "szyhf"},
+ }))
+ })
+
+ It("var testContainer []*testScanSliceStruct", func() {
+ var slice []*testScanSliceStruct
+ err := ScanSlice(data, &slice)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(slice).To(Equal([]*testScanSliceStruct{
+ {-1, "Back Yu"},
+ {1, "szyhf"},
+ }))
+ })
+})
diff --git a/vendor/github.com/go-redis/redis/internal/proto/write_buffer.go b/vendor/github.com/go-redis/redis/internal/proto/write_buffer.go
new file mode 100644
index 000000000..096b6d76a
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/proto/write_buffer.go
@@ -0,0 +1,103 @@
+package proto
+
+import (
+ "encoding"
+ "fmt"
+ "strconv"
+)
+
+type WriteBuffer struct {
+ b []byte
+}
+
+func NewWriteBuffer() *WriteBuffer {
+ return &WriteBuffer{
+ b: make([]byte, 0, 4096),
+ }
+}
+
+func (w *WriteBuffer) Len() int { return len(w.b) }
+func (w *WriteBuffer) Bytes() []byte { return w.b }
+func (w *WriteBuffer) Reset() { w.b = w.b[:0] }
+
+func (w *WriteBuffer) Append(args []interface{}) error {
+ w.b = append(w.b, ArrayReply)
+ w.b = strconv.AppendUint(w.b, uint64(len(args)), 10)
+ w.b = append(w.b, '\r', '\n')
+
+ for _, arg := range args {
+ if err := w.append(arg); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (w *WriteBuffer) append(val interface{}) error {
+ switch v := val.(type) {
+ case nil:
+ w.AppendString("")
+ case string:
+ w.AppendString(v)
+ case []byte:
+ w.AppendBytes(v)
+ case int:
+ w.AppendString(formatInt(int64(v)))
+ case int8:
+ w.AppendString(formatInt(int64(v)))
+ case int16:
+ w.AppendString(formatInt(int64(v)))
+ case int32:
+ w.AppendString(formatInt(int64(v)))
+ case int64:
+ w.AppendString(formatInt(v))
+ case uint:
+ w.AppendString(formatUint(uint64(v)))
+ case uint8:
+ w.AppendString(formatUint(uint64(v)))
+ case uint16:
+ w.AppendString(formatUint(uint64(v)))
+ case uint32:
+ w.AppendString(formatUint(uint64(v)))
+ case uint64:
+ w.AppendString(formatUint(v))
+ case float32:
+ w.AppendString(formatFloat(float64(v)))
+ case float64:
+ w.AppendString(formatFloat(v))
+ case bool:
+ if v {
+ w.AppendString("1")
+ } else {
+ w.AppendString("0")
+ }
+ default:
+ if bm, ok := val.(encoding.BinaryMarshaler); ok {
+ bb, err := bm.MarshalBinary()
+ if err != nil {
+ return err
+ }
+ w.AppendBytes(bb)
+ } else {
+ return fmt.Errorf(
+ "redis: can't marshal %T (consider implementing encoding.BinaryMarshaler)", val)
+ }
+ }
+ return nil
+}
+
+func (w *WriteBuffer) AppendString(s string) {
+ w.b = append(w.b, StringReply)
+ w.b = strconv.AppendUint(w.b, uint64(len(s)), 10)
+ w.b = append(w.b, '\r', '\n')
+ w.b = append(w.b, s...)
+ w.b = append(w.b, '\r', '\n')
+}
+
+func (w *WriteBuffer) AppendBytes(p []byte) {
+ w.b = append(w.b, StringReply)
+ w.b = strconv.AppendUint(w.b, uint64(len(p)), 10)
+ w.b = append(w.b, '\r', '\n')
+ w.b = append(w.b, p...)
+ w.b = append(w.b, '\r', '\n')
+}
diff --git a/vendor/github.com/go-redis/redis/internal/proto/write_buffer_test.go b/vendor/github.com/go-redis/redis/internal/proto/write_buffer_test.go
new file mode 100644
index 000000000..84799ff3b
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/proto/write_buffer_test.go
@@ -0,0 +1,63 @@
+package proto_test
+
+import (
+ "testing"
+ "time"
+
+ "github.com/go-redis/redis/internal/proto"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+var _ = Describe("WriteBuffer", func() {
+ var buf *proto.WriteBuffer
+
+ BeforeEach(func() {
+ buf = proto.NewWriteBuffer()
+ })
+
+ It("should reset", func() {
+ buf.AppendString("string")
+ Expect(buf.Len()).To(Equal(12))
+ buf.Reset()
+ Expect(buf.Len()).To(Equal(0))
+ })
+
+ It("should append args", func() {
+ err := buf.Append([]interface{}{
+ "string",
+ 12,
+ 34.56,
+ []byte{'b', 'y', 't', 'e', 's'},
+ true,
+ nil,
+ })
+ Expect(err).NotTo(HaveOccurred())
+ Expect(buf.Bytes()).To(Equal([]byte("*6\r\n" +
+ "$6\r\nstring\r\n" +
+ "$2\r\n12\r\n" +
+ "$5\r\n34.56\r\n" +
+ "$5\r\nbytes\r\n" +
+ "$1\r\n1\r\n" +
+ "$0\r\n" +
+ "\r\n")))
+ })
+
+ It("should append marshalable args", func() {
+ err := buf.Append([]interface{}{time.Unix(1414141414, 0)})
+ Expect(err).NotTo(HaveOccurred())
+ Expect(buf.Len()).To(Equal(26))
+ })
+
+})
+
+func BenchmarkWriteBuffer_Append(b *testing.B) {
+ buf := proto.NewWriteBuffer()
+ args := []interface{}{"hello", "world", "foo", "bar"}
+
+ for i := 0; i < b.N; i++ {
+ buf.Append(args)
+ buf.Reset()
+ }
+}
diff --git a/vendor/github.com/go-redis/redis/internal/safe.go b/vendor/github.com/go-redis/redis/internal/safe.go
new file mode 100644
index 000000000..870fe541f
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/safe.go
@@ -0,0 +1,11 @@
+// +build appengine
+
+package internal
+
+func BytesToString(b []byte) string {
+ return string(b)
+}
+
+func StringToBytes(s string) []byte {
+ return []byte(s)
+}
diff --git a/vendor/github.com/go-redis/redis/internal/unsafe.go b/vendor/github.com/go-redis/redis/internal/unsafe.go
new file mode 100644
index 000000000..c18b25c17
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/unsafe.go
@@ -0,0 +1,27 @@
+// +build !appengine
+
+package internal
+
+import (
+ "reflect"
+ "unsafe"
+)
+
+func BytesToString(b []byte) string {
+ bytesHeader := (*reflect.SliceHeader)(unsafe.Pointer(&b))
+ strHeader := reflect.StringHeader{
+ Data: bytesHeader.Data,
+ Len: bytesHeader.Len,
+ }
+ return *(*string)(unsafe.Pointer(&strHeader))
+}
+
+func StringToBytes(s string) []byte {
+ sh := (*reflect.StringHeader)(unsafe.Pointer(&s))
+ bh := reflect.SliceHeader{
+ Data: sh.Data,
+ Len: sh.Len,
+ Cap: sh.Len,
+ }
+ return *(*[]byte)(unsafe.Pointer(&bh))
+}
diff --git a/vendor/github.com/go-redis/redis/internal/util.go b/vendor/github.com/go-redis/redis/internal/util.go
new file mode 100644
index 000000000..520596fd9
--- /dev/null
+++ b/vendor/github.com/go-redis/redis/internal/util.go
@@ -0,0 +1,47 @@
+package internal
+
+import "reflect"
+
+func ToLower(s string) string {
+ if isLower(s) {
+ return s
+ }
+
+ b := make([]byte, len(s))
+ for i := range b {
+ c := s[i]
+ if c >= 'A' && c <= 'Z' {
+ c += 'a' - 'A'
+ }
+ b[i] = c
+ }
+ return BytesToString(b)
+}
+
+func isLower(s string) bool {
+ for i := 0; i < len(s); i++ {
+ c := s[i]
+ if c >= 'A' && c <= 'Z' {
+ return false
+ }
+ }
+ return true
+}
+
+func SliceNextElem(v reflect.Value) reflect.Value {
+ if v.Len() < v.Cap() {
+ v.Set(v.Slice(0, v.Len()+1))
+ return v.Index(v.Len() - 1)
+ }
+
+ elemType := v.Type().Elem()
+
+ if elemType.Kind() == reflect.Ptr {
+ elem := reflect.New(elemType.Elem())
+ v.Set(reflect.Append(v, elem))
+ return elem.Elem()
+ }
+
+ v.Set(reflect.Append(v, reflect.Zero(elemType)))
+ return v.Index(v.Len() - 1)
+}