From 61e27beabc9804fdcf59ed9df2180802175a4f70 Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Tue, 28 Aug 2018 10:05:26 -0700 Subject: Updating dependancies. (#9303) --- vendor/github.com/go-redis/redis/CHANGELOG.md | 21 + vendor/github.com/go-redis/redis/README.md | 33 +- vendor/github.com/go-redis/redis/cluster.go | 325 ++++--- vendor/github.com/go-redis/redis/command.go | 956 +++++++++++++++++++-- vendor/github.com/go-redis/redis/commands.go | 296 ++++++- vendor/github.com/go-redis/redis/internal/error.go | 29 +- .../go-redis/redis/internal/hashtag/hashtag.go | 6 +- .../go-redis/redis/internal/pool/conn.go | 41 +- .../go-redis/redis/internal/pool/pool.go | 132 ++- .../go-redis/redis/internal/proto/reader.go | 172 ++-- .../go-redis/redis/internal/proto/write_buffer.go | 101 --- .../go-redis/redis/internal/proto/writer.go | 159 ++++ .../go-redis/redis/internal/util/safe.go | 4 + .../go-redis/redis/internal/util/unsafe.go | 10 + vendor/github.com/go-redis/redis/options.go | 29 +- vendor/github.com/go-redis/redis/parser.go | 394 --------- vendor/github.com/go-redis/redis/pipeline.go | 1 + vendor/github.com/go-redis/redis/pubsub.go | 290 ++++--- vendor/github.com/go-redis/redis/redis.go | 91 +- vendor/github.com/go-redis/redis/result.go | 2 +- vendor/github.com/go-redis/redis/ring.go | 79 +- vendor/github.com/go-redis/redis/sentinel.go | 82 +- vendor/github.com/go-redis/redis/tx.go | 12 +- vendor/github.com/go-redis/redis/universal.go | 102 ++- 24 files changed, 2238 insertions(+), 1129 deletions(-) create mode 100644 vendor/github.com/go-redis/redis/CHANGELOG.md delete mode 100644 vendor/github.com/go-redis/redis/internal/proto/write_buffer.go create mode 100644 vendor/github.com/go-redis/redis/internal/proto/writer.go delete mode 100644 vendor/github.com/go-redis/redis/parser.go (limited to 'vendor/github.com/go-redis/redis') diff --git a/vendor/github.com/go-redis/redis/CHANGELOG.md b/vendor/github.com/go-redis/redis/CHANGELOG.md new file mode 100644 index 000000000..7c40d5e38 --- /dev/null +++ b/vendor/github.com/go-redis/redis/CHANGELOG.md @@ -0,0 +1,21 @@ +# Changelog + +## 6.14 + +- Added Options.MinIdleConns. +- Added Options.MaxConnAge. +- PoolStats.FreeConns is renamed to PoolStats.IdleConns. +- Add Client.Do to simplify creating custom commands. +- Add Cmd.String, Cmd.Int, Cmd.Int64, Cmd.Uint64, Cmd.Float64, and Cmd.Bool helpers. +- Lower memory usage. + +## v6.13 + +- Ring got new options called `HashReplicas` and `Hash`. It is recommended to set `HashReplicas = 1000` for better keys distribution between shards. +- Cluster client was optimized to use much less memory when reloading cluster state. +- PubSub.ReceiveMessage is re-worked to not use ReceiveTimeout so it does not lose data when timeout occurres. In most cases it is recommended to use PubSub.Channel instead. +- Dialer.KeepAlive is set to 5 minutes by default. + +## v6.12 + +- ClusterClient got new option called `ClusterSlots` which allows to build cluster of normal Redis Servers that don't have cluster mode enabled. See https://godoc.org/github.com/go-redis/redis#example-NewClusterClient--ManualSetup diff --git a/vendor/github.com/go-redis/redis/README.md b/vendor/github.com/go-redis/redis/README.md index 9f349764a..7d05b4466 100644 --- a/vendor/github.com/go-redis/redis/README.md +++ b/vendor/github.com/go-redis/redis/README.md @@ -15,6 +15,7 @@ Supports: - [Timeouts](https://godoc.org/github.com/go-redis/redis#Options). - [Redis Sentinel](https://godoc.org/github.com/go-redis/redis#NewFailoverClient). - [Redis Cluster](https://godoc.org/github.com/go-redis/redis#NewClusterClient). +- [Cluster of Redis Servers](https://godoc.org/github.com/go-redis/redis#example-NewClusterClient--ManualSetup) without using cluster mode and Redis Sentinel. - [Ring](https://godoc.org/github.com/go-redis/redis#NewRing). - [Instrumentation](https://godoc.org/github.com/go-redis/redis#ex-package--Instrumentation). - [Cache friendly](https://github.com/go-redis/cache). @@ -86,25 +87,27 @@ Please go through [examples](https://godoc.org/github.com/go-redis/redis#pkg-exa Some corner cases: - SET key value EX 10 NX - set, err := client.SetNX("key", "value", 10*time.Second).Result() +```go +// SET key value EX 10 NX +set, err := client.SetNX("key", "value", 10*time.Second).Result() - SORT list LIMIT 0 2 ASC - vals, err := client.Sort("list", redis.Sort{Offset: 0, Count: 2, Order: "ASC"}).Result() +// SORT list LIMIT 0 2 ASC +vals, err := client.Sort("list", redis.Sort{Offset: 0, Count: 2, Order: "ASC"}).Result() - ZRANGEBYSCORE zset -inf +inf WITHSCORES LIMIT 0 2 - vals, err := client.ZRangeByScoreWithScores("zset", redis.ZRangeBy{ - Min: "-inf", - Max: "+inf", - Offset: 0, - Count: 2, - }).Result() +// ZRANGEBYSCORE zset -inf +inf WITHSCORES LIMIT 0 2 +vals, err := client.ZRangeByScoreWithScores("zset", redis.ZRangeBy{ + Min: "-inf", + Max: "+inf", + Offset: 0, + Count: 2, +}).Result() - ZINTERSTORE out 2 zset1 zset2 WEIGHTS 2 3 AGGREGATE SUM - vals, err := client.ZInterStore("out", redis.ZStore{Weights: []int64{2, 3}}, "zset1", "zset2").Result() +// ZINTERSTORE out 2 zset1 zset2 WEIGHTS 2 3 AGGREGATE SUM +vals, err := client.ZInterStore("out", redis.ZStore{Weights: []int64{2, 3}}, "zset1", "zset2").Result() - EVAL "return {KEYS[1],ARGV[1]}" 1 "key" "hello" - vals, err := client.Eval("return {KEYS[1],ARGV[1]}", []string{"key"}, "hello").Result() +// EVAL "return {KEYS[1],ARGV[1]}" 1 "key" "hello" +vals, err := client.Eval("return {KEYS[1],ARGV[1]}", []string{"key"}, "hello").Result() +``` ## Benchmark diff --git a/vendor/github.com/go-redis/redis/cluster.go b/vendor/github.com/go-redis/redis/cluster.go index 0c58c8532..6b05f1a56 100644 --- a/vendor/github.com/go-redis/redis/cluster.go +++ b/vendor/github.com/go-redis/redis/cluster.go @@ -8,7 +8,8 @@ import ( "math" "math/rand" "net" - "strings" + "runtime" + "sort" "sync" "sync/atomic" "time" @@ -30,7 +31,7 @@ type ClusterOptions struct { // The maximum number of retries before giving up. Command is retried // on network errors and MOVED/ASK redirects. - // Default is 8. + // Default is 8 retries. MaxRedirects int // Enables read-only commands on slave nodes. @@ -39,16 +40,25 @@ type ClusterOptions struct { // It automatically enables ReadOnly. RouteByLatency bool // Allows routing read-only commands to the random master or slave node. + // It automatically enables ReadOnly. RouteRandomly bool + // Optional function that returns cluster slots information. + // It is useful to manually create cluster of standalone Redis servers + // and load-balance read/write operations between master and slaves. + // It can use service like ZooKeeper to maintain configuration information + // and Cluster.ReloadState to manually trigger state reloading. + ClusterSlots func() ([]ClusterSlot, error) + // Following options are copied from Options struct. OnConnect func(*Conn) error + Password string + MaxRetries int MinRetryBackoff time.Duration MaxRetryBackoff time.Duration - Password string DialTimeout time.Duration ReadTimeout time.Duration @@ -56,6 +66,8 @@ type ClusterOptions struct { // PoolSize applies per cluster node and not for the whole cluster. PoolSize int + MinIdleConns int + MaxConnAge time.Duration PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration @@ -70,10 +82,14 @@ func (opt *ClusterOptions) init() { opt.MaxRedirects = 8 } - if opt.RouteByLatency { + if opt.RouteByLatency || opt.RouteRandomly { opt.ReadOnly = true } + if opt.PoolSize == 0 { + opt.PoolSize = 5 * runtime.NumCPU() + } + switch opt.ReadTimeout { case -1: opt.ReadTimeout = 0 @@ -117,10 +133,11 @@ func (opt *ClusterOptions) clientOptions() *Options { ReadTimeout: opt.ReadTimeout, WriteTimeout: opt.WriteTimeout, - PoolSize: opt.PoolSize, - PoolTimeout: opt.PoolTimeout, - IdleTimeout: opt.IdleTimeout, - + PoolSize: opt.PoolSize, + MinIdleConns: opt.MinIdleConns, + MaxConnAge: opt.MaxConnAge, + PoolTimeout: opt.PoolTimeout, + IdleTimeout: opt.IdleTimeout, IdleCheckFrequency: disableIdleCheck, TLSConfig: opt.TLSConfig, @@ -160,10 +177,6 @@ func (n *clusterNode) Close() error { return n.Client.Close() } -func (n *clusterNode) Test() error { - return n.Client.ClusterInfo().Err() -} - func (n *clusterNode) updateLatency() { const probes = 10 @@ -330,7 +343,7 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) { v, err := c.nodeCreateGroup.Do(addr, func() (interface{}, error) { node := newClusterNode(c.opt, addr) - return node, node.Test() + return node, nil }) c.mu.Lock() @@ -383,12 +396,31 @@ func (c *clusterNodes) Random() (*clusterNode, error) { //------------------------------------------------------------------------------ +type clusterSlot struct { + start, end int + nodes []*clusterNode +} + +type clusterSlotSlice []*clusterSlot + +func (p clusterSlotSlice) Len() int { + return len(p) +} + +func (p clusterSlotSlice) Less(i, j int) bool { + return p[i].start < p[j].start +} + +func (p clusterSlotSlice) Swap(i, j int) { + p[i], p[j] = p[j], p[i] +} + type clusterState struct { nodes *clusterNodes Masters []*clusterNode Slaves []*clusterNode - slots [][]*clusterNode + slots []*clusterSlot generation uint32 createdAt time.Time @@ -400,19 +432,21 @@ func newClusterState( c := clusterState{ nodes: nodes, - slots: make([][]*clusterNode, hashtag.SlotNumber), + slots: make([]*clusterSlot, 0, len(slots)), generation: nodes.NextGeneration(), createdAt: time.Now(), } - isLoopbackOrigin := isLoopbackAddr(origin) + originHost, _, _ := net.SplitHostPort(origin) + isLoopbackOrigin := isLoopback(originHost) + for _, slot := range slots { var nodes []*clusterNode for i, slotNode := range slot.Nodes { addr := slotNode.Addr - if !isLoopbackOrigin && useOriginAddr(origin, addr) { - addr = origin + if !isLoopbackOrigin { + addr = replaceLoopbackHost(addr, originHost) } node, err := c.nodes.GetOrCreate(addr) @@ -430,11 +464,15 @@ func newClusterState( } } - for i := slot.Start; i <= slot.End; i++ { - c.slots[i] = nodes - } + c.slots = append(c.slots, &clusterSlot{ + start: slot.Start, + end: slot.End, + nodes: nodes, + }) } + sort.Sort(clusterSlotSlice(c.slots)) + time.AfterFunc(time.Minute, func() { nodes.GC(c.generation) }) @@ -442,6 +480,33 @@ func newClusterState( return &c, nil } +func replaceLoopbackHost(nodeAddr, originHost string) string { + nodeHost, nodePort, err := net.SplitHostPort(nodeAddr) + if err != nil { + return nodeAddr + } + + nodeIP := net.ParseIP(nodeHost) + if nodeIP == nil { + return nodeAddr + } + + if !nodeIP.IsLoopback() { + return nodeAddr + } + + // Use origin host which is not loopback and node port. + return net.JoinHostPort(originHost, nodePort) +} + +func isLoopback(host string) bool { + ip := net.ParseIP(host) + if ip == nil { + return true + } + return ip.IsLoopback() +} + func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) { nodes := c.slotNodes(slot) if len(nodes) > 0 { @@ -502,32 +567,24 @@ func (c *clusterState) slotRandomNode(slot int) *clusterNode { } func (c *clusterState) slotNodes(slot int) []*clusterNode { - if slot >= 0 && slot < len(c.slots) { - return c.slots[slot] + i := sort.Search(len(c.slots), func(i int) bool { + return c.slots[i].end >= slot + }) + if i >= len(c.slots) { + return nil + } + x := c.slots[i] + if slot >= x.start && slot <= x.end { + return x.nodes } return nil } func (c *clusterState) IsConsistent() bool { - if len(c.Masters) > len(c.Slaves) { - return false - } - - for _, master := range c.Masters { - s := master.Client.Info("replication").Val() - if !strings.Contains(s, "role:master") { - return false - } - } - - for _, slave := range c.Slaves { - s := slave.Client.Info("replication").Val() - if !strings.Contains(s, "role:slave") { - return false - } + if c.nodes.opt.ClusterSlots != nil { + return true } - - return true + return len(c.Masters) <= len(c.Slaves) } //------------------------------------------------------------------------------ @@ -555,7 +612,7 @@ func (c *clusterStateHolder) Reload() (*clusterState, error) { return nil, err } if !state.IsConsistent() { - c.LazyReload() + time.AfterFunc(time.Second, c.LazyReload) } return state, nil } @@ -614,6 +671,14 @@ func (c *clusterStateHolder) Get() (*clusterState, error) { return nil, errors.New("redis: cluster has no state") } +func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) { + state, err := c.Reload() + if err == nil { + return state, nil + } + return c.Get() +} + //------------------------------------------------------------------------------ // ClusterClient is a Redis Cluster client representing a pool of zero @@ -653,6 +718,8 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { c.init() _, _ = c.state.Reload() + _, _ = c.cmdsInfoCache.Get() + if opt.IdleCheckFrequency > 0 { go c.reaper(opt.IdleCheckFrequency) } @@ -660,6 +727,13 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { return c } +// ReloadState reloads cluster state. It calls ClusterSlots func +// to get cluster slots information. +func (c *ClusterClient) ReloadState() error { + _, err := c.state.Reload() + return err +} + func (c *ClusterClient) init() { c.cmdable.setProcessor(c.Process) } @@ -818,6 +892,7 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { } if internal.IsRetryableError(err, true) { + c.state.LazyReload() continue } @@ -853,6 +928,13 @@ func (c *ClusterClient) Close() error { return c.nodes.Close() } +// Do creates a Cmd from the args and processes the cmd. +func (c *ClusterClient) Do(args ...interface{}) *Cmd { + cmd := NewCmd(args...) + c.Process(cmd) + return cmd +} + func (c *ClusterClient) WrapProcess( fn func(oldProcess func(Cmder) error) func(Cmder) error, ) { @@ -904,12 +986,14 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error { } if internal.IsRetryableError(err, true) { - // Firstly retry the same node. + c.state.LazyReload() + + // First retry the same node. if attempt == 0 { continue } - // Secondly try random node. + // Second try random node. node, err = c.nodes.Random() if err != nil { break @@ -944,12 +1028,9 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error { // ForEachMaster concurrently calls the fn on each master node in the cluster. // It returns the first error if any. func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { - state, err := c.state.Reload() + state, err := c.state.ReloadOrGet() if err != nil { - state, err = c.state.Get() - if err != nil { - return err - } + return err } var wg sync.WaitGroup @@ -980,12 +1061,9 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { // ForEachSlave concurrently calls the fn on each slave node in the cluster. // It returns the first error if any. func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error { - state, err := c.state.Reload() + state, err := c.state.ReloadOrGet() if err != nil { - state, err = c.state.Get() - if err != nil { - return err - } + return err } var wg sync.WaitGroup @@ -1016,12 +1094,9 @@ func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error { // ForEachNode concurrently calls the fn on each known node in the cluster. // It returns the first error if any. func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error { - state, err := c.state.Reload() + state, err := c.state.ReloadOrGet() if err != nil { - state, err = c.state.Get() - if err != nil { - return err - } + return err } var wg sync.WaitGroup @@ -1071,7 +1146,7 @@ func (c *ClusterClient) PoolStats() *PoolStats { acc.Timeouts += s.Timeouts acc.TotalConns += s.TotalConns - acc.FreeConns += s.FreeConns + acc.IdleConns += s.IdleConns acc.StaleConns += s.StaleConns } @@ -1082,7 +1157,7 @@ func (c *ClusterClient) PoolStats() *PoolStats { acc.Timeouts += s.Timeouts acc.TotalConns += s.TotalConns - acc.FreeConns += s.FreeConns + acc.IdleConns += s.IdleConns acc.StaleConns += s.StaleConns } @@ -1090,6 +1165,14 @@ func (c *ClusterClient) PoolStats() *PoolStats { } func (c *ClusterClient) loadState() (*clusterState, error) { + if c.opt.ClusterSlots != nil { + slots, err := c.opt.ClusterSlots() + if err != nil { + return nil, err + } + return newClusterState(c.nodes, slots, "") + } + addrs, err := c.nodes.Addrs() if err != nil { return nil, err @@ -1196,7 +1279,7 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { cmdsMap = failedCmds } - return firstCmdsErr(cmds) + return cmdsFirstErr(cmds) } func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) { @@ -1207,9 +1290,16 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, e } cmdsMap := make(map[*clusterNode][]Cmder) + cmdsAreReadOnly := c.cmdsAreReadOnly(cmds) for _, cmd := range cmds { - slot := c.cmdSlot(cmd) - node, err := state.slotMasterNode(slot) + var node *clusterNode + var err error + if cmdsAreReadOnly { + _, node, err = c.cmdSlotAndNode(cmd) + } else { + slot := c.cmdSlot(cmd) + node, err = state.slotMasterNode(slot) + } if err != nil { return nil, err } @@ -1218,6 +1308,16 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, e return cmdsMap, nil } +func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool { + for _, cmd := range cmds { + cmdInfo := c.cmdInfo(cmd.Name()) + if cmdInfo == nil || !cmdInfo.ReadOnly { + return false + } + } + return true +} + func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cmder) { remappedCmds, err := c.mapCmdsByNode(cmds) if err != nil { @@ -1233,26 +1333,26 @@ func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cm func (c *ClusterClient) pipelineProcessCmds( node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, ) error { - _ = cn.SetWriteTimeout(c.opt.WriteTimeout) - - err := writeCmd(cn, cmds...) + err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return writeCmd(wr, cmds...) + }) if err != nil { setCmdsErr(cmds, err) failedCmds[node] = cmds return err } - // Set read timeout for all commands. - _ = cn.SetReadTimeout(c.opt.ReadTimeout) - - return c.pipelineReadCmds(cn, cmds, failedCmds) + err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error { + return c.pipelineReadCmds(rd, cmds, failedCmds) + }) + return err } func (c *ClusterClient) pipelineReadCmds( - cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, + rd *proto.Reader, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, ) error { for _, cmd := range cmds { - err := cmd.readReply(cn) + err := cmd.readReply(rd) if err == nil { continue } @@ -1361,7 +1461,7 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error { } } - return firstCmdsErr(cmds) + return cmdsFirstErr(cmds) } func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { @@ -1376,35 +1476,37 @@ func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { func (c *ClusterClient) txPipelineProcessCmds( node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, ) error { - cn.SetWriteTimeout(c.opt.WriteTimeout) - if err := txPipelineWriteMulti(cn, cmds); err != nil { + err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return txPipelineWriteMulti(wr, cmds) + }) + if err != nil { setCmdsErr(cmds, err) failedCmds[node] = cmds return err } - // Set read timeout for all commands. - cn.SetReadTimeout(c.opt.ReadTimeout) - - if err := c.txPipelineReadQueued(cn, cmds, failedCmds); err != nil { - setCmdsErr(cmds, err) - return err - } - - return pipelineReadCmds(cn, cmds) + err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error { + err := c.txPipelineReadQueued(rd, cmds, failedCmds) + if err != nil { + setCmdsErr(cmds, err) + return err + } + return pipelineReadCmds(rd, cmds) + }) + return err } func (c *ClusterClient) txPipelineReadQueued( - cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, + rd *proto.Reader, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, ) error { // Parse queued replies. var statusCmd StatusCmd - if err := statusCmd.readReply(cn); err != nil { + if err := statusCmd.readReply(rd); err != nil { return err } for _, cmd := range cmds { - err := statusCmd.readReply(cn) + err := statusCmd.readReply(rd) if err == nil { continue } @@ -1417,7 +1519,7 @@ func (c *ClusterClient) txPipelineReadQueued( } // Parse number of replies. - line, err := cn.Rd.ReadLine() + line, err := rd.ReadLine() if err != nil { if err == Nil { err = TxFailedErr @@ -1445,11 +1547,9 @@ func (c *ClusterClient) txPipelineReadQueued( } func (c *ClusterClient) pubSub(channels []string) *PubSub { - opt := c.opt.clientOptions() - var node *clusterNode - return &PubSub{ - opt: opt, + pubsub := &PubSub{ + opt: c.opt.clientOptions(), newConn: func(channels []string) (*pool.Conn, error) { if node == nil { @@ -1472,6 +1572,8 @@ func (c *ClusterClient) pubSub(channels []string) *PubSub { return node.Client.connPool.CloseConn(cn) }, } + pubsub.init() + return pubsub } // Subscribe subscribes the client to the specified channels. @@ -1494,43 +1596,6 @@ func (c *ClusterClient) PSubscribe(channels ...string) *PubSub { return pubsub } -func useOriginAddr(originAddr, nodeAddr string) bool { - nodeHost, nodePort, err := net.SplitHostPort(nodeAddr) - if err != nil { - return false - } - - nodeIP := net.ParseIP(nodeHost) - if nodeIP == nil { - return false - } - - if !nodeIP.IsLoopback() { - return false - } - - _, originPort, err := net.SplitHostPort(originAddr) - if err != nil { - return false - } - - return nodePort == originPort -} - -func isLoopbackAddr(addr string) bool { - host, _, err := net.SplitHostPort(addr) - if err != nil { - return false - } - - ip := net.ParseIP(host) - if ip == nil { - return false - } - - return ip.IsLoopback() -} - func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode { for _, n := range nodes { if n == node { diff --git a/vendor/github.com/go-redis/redis/command.go b/vendor/github.com/go-redis/redis/command.go index 552c897bb..ca44d7c8b 100644 --- a/vendor/github.com/go-redis/redis/command.go +++ b/vendor/github.com/go-redis/redis/command.go @@ -1,16 +1,14 @@ package redis import ( - "bytes" "fmt" + "net" "strconv" "strings" "time" "github.com/go-redis/redis/internal" - "github.com/go-redis/redis/internal/pool" "github.com/go-redis/redis/internal/proto" - "github.com/go-redis/redis/internal/util" ) type Cmder interface { @@ -18,13 +16,12 @@ type Cmder interface { Args() []interface{} stringArg(int) string - readReply(*pool.Conn) error + readReply(rd *proto.Reader) error setErr(error) readTimeout() *time.Duration Err() error - fmt.Stringer } func setCmdsErr(cmds []Cmder, e error) { @@ -35,7 +32,7 @@ func setCmdsErr(cmds []Cmder, e error) { } } -func firstCmdsErr(cmds []Cmder) error { +func cmdsFirstErr(cmds []Cmder) error { for _, cmd := range cmds { if err := cmd.Err(); err != nil { return err @@ -44,16 +41,14 @@ func firstCmdsErr(cmds []Cmder) error { return nil } -func writeCmd(cn *pool.Conn, cmds ...Cmder) error { - cn.Wb.Reset() +func writeCmd(wr *proto.Writer, cmds ...Cmder) error { for _, cmd := range cmds { - if err := cn.Wb.Append(cmd.Args()); err != nil { + err := wr.WriteArgs(cmd.Args()) + if err != nil { return err } } - - _, err := cn.Write(cn.Wb.Bytes()) - return err + return nil } func cmdString(cmd Cmder, val interface{}) string { @@ -165,20 +160,124 @@ func (cmd *Cmd) Result() (interface{}, error) { return cmd.val, cmd.err } -func (cmd *Cmd) String() string { - return cmdString(cmd, cmd.val) +func (cmd *Cmd) String() (string, error) { + if cmd.err != nil { + return "", cmd.err + } + switch val := cmd.val.(type) { + case string: + return val, nil + default: + err := fmt.Errorf("redis: unexpected type=%T for String", val) + return "", err + } } -func (cmd *Cmd) readReply(cn *pool.Conn) error { - cmd.val, cmd.err = cn.Rd.ReadReply(sliceParser) +func (cmd *Cmd) Int() (int, error) { if cmd.err != nil { - return cmd.err + return 0, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return int(val), nil + case string: + return strconv.Atoi(val) + default: + err := fmt.Errorf("redis: unexpected type=%T for Int64", val) + return 0, err } - if b, ok := cmd.val.([]byte); ok { - // Bytes must be copied, because underlying memory is reused. - cmd.val = string(b) +} + +func (cmd *Cmd) Int64() (int64, error) { + if cmd.err != nil { + return 0, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return val, nil + case string: + return strconv.ParseInt(val, 10, 64) + default: + err := fmt.Errorf("redis: unexpected type=%T for Int64", val) + return 0, err } - return nil +} + +func (cmd *Cmd) Uint64() (uint64, error) { + if cmd.err != nil { + return 0, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return uint64(val), nil + case string: + return strconv.ParseUint(val, 10, 64) + default: + err := fmt.Errorf("redis: unexpected type=%T for Uint64", val) + return 0, err + } +} + +func (cmd *Cmd) Float64() (float64, error) { + if cmd.err != nil { + return 0, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return float64(val), nil + case string: + return strconv.ParseFloat(val, 64) + default: + err := fmt.Errorf("redis: unexpected type=%T for Float64", val) + return 0, err + } +} + +func (cmd *Cmd) Bool() (bool, error) { + if cmd.err != nil { + return false, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return val != 0, nil + case string: + return strconv.ParseBool(val) + default: + err := fmt.Errorf("redis: unexpected type=%T for Bool", val) + return false, err + } +} + +func (cmd *Cmd) readReply(rd *proto.Reader) error { + cmd.val, cmd.err = rd.ReadReply(sliceParser) + return cmd.err +} + +// Implements proto.MultiBulkParse +func sliceParser(rd *proto.Reader, n int64) (interface{}, error) { + vals := make([]interface{}, 0, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadReply(sliceParser) + if err != nil { + if err == Nil { + vals = append(vals, nil) + continue + } + if err, ok := err.(proto.RedisError); ok { + vals = append(vals, err) + continue + } + return nil, err + } + + switch v := v.(type) { + case string: + vals = append(vals, v) + default: + vals = append(vals, v) + } + } + return vals, nil } //------------------------------------------------------------------------------ @@ -209,9 +308,9 @@ func (cmd *SliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *SliceCmd) readReply(cn *pool.Conn) error { +func (cmd *SliceCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(sliceParser) + v, cmd.err = rd.ReadArrayReply(sliceParser) if cmd.err != nil { return cmd.err } @@ -247,8 +346,8 @@ func (cmd *StatusCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StatusCmd) readReply(cn *pool.Conn) error { - cmd.val, cmd.err = cn.Rd.ReadStringReply() +func (cmd *StatusCmd) readReply(rd *proto.Reader) error { + cmd.val, cmd.err = rd.ReadString() return cmd.err } @@ -280,8 +379,8 @@ func (cmd *IntCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *IntCmd) readReply(cn *pool.Conn) error { - cmd.val, cmd.err = cn.Rd.ReadIntReply() +func (cmd *IntCmd) readReply(rd *proto.Reader) error { + cmd.val, cmd.err = rd.ReadIntReply() return cmd.err } @@ -315,9 +414,9 @@ func (cmd *DurationCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *DurationCmd) readReply(cn *pool.Conn) error { +func (cmd *DurationCmd) readReply(rd *proto.Reader) error { var n int64 - n, cmd.err = cn.Rd.ReadIntReply() + n, cmd.err = rd.ReadIntReply() if cmd.err != nil { return cmd.err } @@ -353,9 +452,9 @@ func (cmd *TimeCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *TimeCmd) readReply(cn *pool.Conn) error { +func (cmd *TimeCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(timeParser) + v, cmd.err = rd.ReadArrayReply(timeParser) if cmd.err != nil { return cmd.err } @@ -363,6 +462,25 @@ func (cmd *TimeCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func timeParser(rd *proto.Reader, n int64) (interface{}, error) { + if n != 2 { + return nil, fmt.Errorf("got %d elements, expected 2", n) + } + + sec, err := rd.ReadInt() + if err != nil { + return nil, err + } + + microsec, err := rd.ReadInt() + if err != nil { + return nil, err + } + + return time.Unix(sec, microsec*1000), nil +} + //------------------------------------------------------------------------------ type BoolCmd struct { @@ -391,11 +509,9 @@ func (cmd *BoolCmd) String() string { return cmdString(cmd, cmd.val) } -var ok = []byte("OK") - -func (cmd *BoolCmd) readReply(cn *pool.Conn) error { +func (cmd *BoolCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadReply(nil) + v, cmd.err = rd.ReadReply(nil) // `SET key value NX` returns nil when key already exists. But // `SETNX key value` returns bool (0/1). So convert nil to bool. // TODO: is this okay? @@ -411,8 +527,8 @@ func (cmd *BoolCmd) readReply(cn *pool.Conn) error { case int64: cmd.val = v == 1 return nil - case []byte: - cmd.val = bytes.Equal(v, ok) + case string: + cmd.val = v == "OK" return nil default: cmd.err = fmt.Errorf("got %T, wanted int64 or string", v) @@ -425,7 +541,7 @@ func (cmd *BoolCmd) readReply(cn *pool.Conn) error { type StringCmd struct { baseCmd - val []byte + val string } var _ Cmder = (*StringCmd)(nil) @@ -437,7 +553,7 @@ func NewStringCmd(args ...interface{}) *StringCmd { } func (cmd *StringCmd) Val() string { - return util.BytesToString(cmd.val) + return cmd.val } func (cmd *StringCmd) Result() (string, error) { @@ -445,7 +561,14 @@ func (cmd *StringCmd) Result() (string, error) { } func (cmd *StringCmd) Bytes() ([]byte, error) { - return cmd.val, cmd.err + return []byte(cmd.val), cmd.err +} + +func (cmd *StringCmd) Int() (int, error) { + if cmd.err != nil { + return 0, cmd.err + } + return strconv.Atoi(cmd.Val()) } func (cmd *StringCmd) Int64() (int64, error) { @@ -473,15 +596,15 @@ func (cmd *StringCmd) Scan(val interface{}) error { if cmd.err != nil { return cmd.err } - return proto.Scan(cmd.val, val) + return proto.Scan([]byte(cmd.val), val) } func (cmd *StringCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringCmd) readReply(cn *pool.Conn) error { - cmd.val, cmd.err = cn.Rd.ReadBytesReply() +func (cmd *StringCmd) readReply(rd *proto.Reader) error { + cmd.val, cmd.err = rd.ReadString() return cmd.err } @@ -513,8 +636,8 @@ func (cmd *FloatCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *FloatCmd) readReply(cn *pool.Conn) error { - cmd.val, cmd.err = cn.Rd.ReadFloatReply() +func (cmd *FloatCmd) readReply(rd *proto.Reader) error { + cmd.val, cmd.err = rd.ReadFloatReply() return cmd.err } @@ -550,9 +673,9 @@ func (cmd *StringSliceCmd) ScanSlice(container interface{}) error { return proto.ScanSlice(cmd.Val(), container) } -func (cmd *StringSliceCmd) readReply(cn *pool.Conn) error { +func (cmd *StringSliceCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(stringSliceParser) + v, cmd.err = rd.ReadArrayReply(stringSliceParser) if cmd.err != nil { return cmd.err } @@ -560,6 +683,22 @@ func (cmd *StringSliceCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + ss := make([]string, 0, n) + for i := int64(0); i < n; i++ { + s, err := rd.ReadString() + if err == Nil { + ss = append(ss, "") + } else if err != nil { + return nil, err + } else { + ss = append(ss, s) + } + } + return ss, nil +} + //------------------------------------------------------------------------------ type BoolSliceCmd struct { @@ -588,9 +727,9 @@ func (cmd *BoolSliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *BoolSliceCmd) readReply(cn *pool.Conn) error { +func (cmd *BoolSliceCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(boolSliceParser) + v, cmd.err = rd.ReadArrayReply(boolSliceParser) if cmd.err != nil { return cmd.err } @@ -598,6 +737,19 @@ func (cmd *BoolSliceCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func boolSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + bools := make([]bool, 0, n) + for i := int64(0); i < n; i++ { + n, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + bools = append(bools, n == 1) + } + return bools, nil +} + //------------------------------------------------------------------------------ type StringStringMapCmd struct { @@ -626,9 +778,9 @@ func (cmd *StringStringMapCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringStringMapCmd) readReply(cn *pool.Conn) error { +func (cmd *StringStringMapCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(stringStringMapParser) + v, cmd.err = rd.ReadArrayReply(stringStringMapParser) if cmd.err != nil { return cmd.err } @@ -636,6 +788,25 @@ func (cmd *StringStringMapCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func stringStringMapParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]string, n/2) + for i := int64(0); i < n; i += 2 { + key, err := rd.ReadString() + if err != nil { + return nil, err + } + + value, err := rd.ReadString() + if err != nil { + return nil, err + } + + m[key] = value + } + return m, nil +} + //------------------------------------------------------------------------------ type StringIntMapCmd struct { @@ -664,9 +835,9 @@ func (cmd *StringIntMapCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringIntMapCmd) readReply(cn *pool.Conn) error { +func (cmd *StringIntMapCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(stringIntMapParser) + v, cmd.err = rd.ReadArrayReply(stringIntMapParser) if cmd.err != nil { return cmd.err } @@ -674,6 +845,25 @@ func (cmd *StringIntMapCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func stringIntMapParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]int64, n/2) + for i := int64(0); i < n; i += 2 { + key, err := rd.ReadString() + if err != nil { + return nil, err + } + + n, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + + m[key] = n + } + return m, nil +} + //------------------------------------------------------------------------------ type StringStructMapCmd struct { @@ -702,9 +892,9 @@ func (cmd *StringStructMapCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringStructMapCmd) readReply(cn *pool.Conn) error { +func (cmd *StringStructMapCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(stringStructMapParser) + v, cmd.err = rd.ReadArrayReply(stringStructMapParser) if cmd.err != nil { return cmd.err } @@ -712,6 +902,380 @@ func (cmd *StringStructMapCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func stringStructMapParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]struct{}, n) + for i := int64(0); i < n; i++ { + key, err := rd.ReadString() + if err != nil { + return nil, err + } + + m[key] = struct{}{} + } + return m, nil +} + +//------------------------------------------------------------------------------ + +type XMessage struct { + ID string + Values map[string]interface{} +} + +type XMessageSliceCmd struct { + baseCmd + + val []XMessage +} + +var _ Cmder = (*XMessageSliceCmd)(nil) + +func NewXMessageSliceCmd(args ...interface{}) *XMessageSliceCmd { + return &XMessageSliceCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *XMessageSliceCmd) Val() []XMessage { + return cmd.val +} + +func (cmd *XMessageSliceCmd) Result() ([]XMessage, error) { + return cmd.val, cmd.err +} + +func (cmd *XMessageSliceCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *XMessageSliceCmd) readReply(rd *proto.Reader) error { + var v interface{} + v, cmd.err = rd.ReadArrayReply(xMessageSliceParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = v.([]XMessage) + return nil +} + +// Implements proto.MultiBulkParse +func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + msgs := make([]XMessage, 0, n) + for i := int64(0); i < n; i++ { + _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + id, err := rd.ReadString() + if err != nil { + return nil, err + } + + v, err := rd.ReadArrayReply(stringInterfaceMapParser) + if err != nil { + return nil, err + } + + msgs = append(msgs, XMessage{ + ID: id, + Values: v.(map[string]interface{}), + }) + return nil, nil + }) + if err != nil { + return nil, err + } + } + return msgs, nil +} + +// Implements proto.MultiBulkParse +func stringInterfaceMapParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]interface{}, n/2) + for i := int64(0); i < n; i += 2 { + key, err := rd.ReadString() + if err != nil { + return nil, err + } + + value, err := rd.ReadString() + if err != nil { + return nil, err + } + + m[key] = value + } + return m, nil +} + +//------------------------------------------------------------------------------ + +type XStream struct { + Stream string + Messages []XMessage +} + +type XStreamSliceCmd struct { + baseCmd + + val []XStream +} + +var _ Cmder = (*XStreamSliceCmd)(nil) + +func NewXStreamSliceCmd(args ...interface{}) *XStreamSliceCmd { + return &XStreamSliceCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *XStreamSliceCmd) Val() []XStream { + return cmd.val +} + +func (cmd *XStreamSliceCmd) Result() ([]XStream, error) { + return cmd.val, cmd.err +} + +func (cmd *XStreamSliceCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *XStreamSliceCmd) readReply(rd *proto.Reader) error { + var v interface{} + v, cmd.err = rd.ReadArrayReply(xStreamSliceParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = v.([]XStream) + return nil +} + +// Implements proto.MultiBulkParse +func xStreamSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + ret := make([]XStream, 0, n) + for i := int64(0); i < n; i++ { + _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + if n != 2 { + return nil, fmt.Errorf("got %d, wanted 2", n) + } + + stream, err := rd.ReadString() + if err != nil { + return nil, err + } + + v, err := rd.ReadArrayReply(xMessageSliceParser) + if err != nil { + return nil, err + } + + ret = append(ret, XStream{ + Stream: stream, + Messages: v.([]XMessage), + }) + return nil, nil + }) + if err != nil { + return nil, err + } + } + return ret, nil +} + +//------------------------------------------------------------------------------ + +type XPending struct { + Count int64 + Lower string + Higher string + Consumers map[string]int64 +} + +type XPendingCmd struct { + baseCmd + val *XPending +} + +var _ Cmder = (*XPendingCmd)(nil) + +func NewXPendingCmd(args ...interface{}) *XPendingCmd { + return &XPendingCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *XPendingCmd) Val() *XPending { + return cmd.val +} + +func (cmd *XPendingCmd) Result() (*XPending, error) { + return cmd.val, cmd.err +} + +func (cmd *XPendingCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *XPendingCmd) readReply(rd *proto.Reader) error { + var info interface{} + info, cmd.err = rd.ReadArrayReply(xPendingParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = info.(*XPending) + return nil +} + +func xPendingParser(rd *proto.Reader, n int64) (interface{}, error) { + if n != 4 { + return nil, fmt.Errorf("got %d, wanted 4", n) + } + + count, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + + lower, err := rd.ReadString() + if err != nil && err != Nil { + return nil, err + } + + higher, err := rd.ReadString() + if err != nil && err != Nil { + return nil, err + } + + pending := &XPending{ + Count: count, + Lower: lower, + Higher: higher, + } + _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + for i := int64(0); i < n; i++ { + _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + if n != 2 { + return nil, fmt.Errorf("got %d, wanted 2", n) + } + + consumerName, err := rd.ReadString() + if err != nil { + return nil, err + } + + consumerPending, err := rd.ReadInt() + if err != nil { + return nil, err + } + + if pending.Consumers == nil { + pending.Consumers = make(map[string]int64) + } + pending.Consumers[consumerName] = consumerPending + + return nil, nil + }) + if err != nil { + return nil, err + } + } + return nil, nil + }) + if err != nil && err != Nil { + return nil, err + } + + return pending, nil +} + +//------------------------------------------------------------------------------ + +type XPendingExt struct { + Id string + Consumer string + Idle time.Duration + RetryCount int64 +} + +type XPendingExtCmd struct { + baseCmd + val []XPendingExt +} + +var _ Cmder = (*XPendingExtCmd)(nil) + +func NewXPendingExtCmd(args ...interface{}) *XPendingExtCmd { + return &XPendingExtCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *XPendingExtCmd) Val() []XPendingExt { + return cmd.val +} + +func (cmd *XPendingExtCmd) Result() ([]XPendingExt, error) { + return cmd.val, cmd.err +} + +func (cmd *XPendingExtCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *XPendingExtCmd) readReply(rd *proto.Reader) error { + var info interface{} + info, cmd.err = rd.ReadArrayReply(xPendingExtSliceParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = info.([]XPendingExt) + return nil +} + +func xPendingExtSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + ret := make([]XPendingExt, 0, n) + for i := int64(0); i < n; i++ { + _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + if n != 4 { + return nil, fmt.Errorf("got %d, wanted 4", n) + } + + id, err := rd.ReadString() + if err != nil { + return nil, err + } + + consumer, err := rd.ReadString() + if err != nil && err != Nil { + return nil, err + } + + idle, err := rd.ReadIntReply() + if err != nil && err != Nil { + return nil, err + } + + retryCount, err := rd.ReadIntReply() + if err != nil && err != Nil { + return nil, err + } + + ret = append(ret, XPendingExt{ + Id: id, + Consumer: consumer, + Idle: time.Duration(idle) * time.Millisecond, + RetryCount: retryCount, + }) + return nil, nil + }) + if err != nil { + return nil, err + } + } + return ret, nil +} + +//------------------------------------------------------------------------------ + //------------------------------------------------------------------------------ type ZSliceCmd struct { @@ -740,9 +1304,9 @@ func (cmd *ZSliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *ZSliceCmd) readReply(cn *pool.Conn) error { +func (cmd *ZSliceCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(zSliceParser) + v, cmd.err = rd.ReadArrayReply(zSliceParser) if cmd.err != nil { return cmd.err } @@ -750,6 +1314,27 @@ func (cmd *ZSliceCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + zz := make([]Z, n/2) + for i := int64(0); i < n; i += 2 { + var err error + + z := &zz[i/2] + + z.Member, err = rd.ReadString() + if err != nil { + return nil, err + } + + z.Score, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + } + return zz, nil +} + //------------------------------------------------------------------------------ type ScanCmd struct { @@ -782,8 +1367,8 @@ func (cmd *ScanCmd) String() string { return cmdString(cmd, cmd.page) } -func (cmd *ScanCmd) readReply(cn *pool.Conn) error { - cmd.page, cmd.cursor, cmd.err = cn.Rd.ReadScanReply() +func (cmd *ScanCmd) readReply(rd *proto.Reader) error { + cmd.page, cmd.cursor, cmd.err = rd.ReadScanReply() return cmd.err } @@ -833,9 +1418,9 @@ func (cmd *ClusterSlotsCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *ClusterSlotsCmd) readReply(cn *pool.Conn) error { +func (cmd *ClusterSlotsCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(clusterSlotsParser) + v, cmd.err = rd.ReadArrayReply(clusterSlotsParser) if cmd.err != nil { return cmd.err } @@ -843,6 +1428,70 @@ func (cmd *ClusterSlotsCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func clusterSlotsParser(rd *proto.Reader, n int64) (interface{}, error) { + slots := make([]ClusterSlot, n) + for i := 0; i < len(slots); i++ { + n, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if n < 2 { + err := fmt.Errorf("redis: got %d elements in cluster info, expected at least 2", n) + return nil, err + } + + start, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + + end, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + + nodes := make([]ClusterNode, n-2) + for j := 0; j < len(nodes); j++ { + n, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if n != 2 && n != 3 { + err := fmt.Errorf("got %d elements in cluster info address, expected 2 or 3", n) + return nil, err + } + + ip, err := rd.ReadString() + if err != nil { + return nil, err + } + + port, err := rd.ReadString() + if err != nil { + return nil, err + } + + nodes[j].Addr = net.JoinHostPort(ip, port) + + if n == 3 { + id, err := rd.ReadString() + if err != nil { + return nil, err + } + nodes[j].Id = id + } + } + + slots[i] = ClusterSlot{ + Start: int(start), + End: int(end), + Nodes: nodes, + } + } + return slots, nil +} + //------------------------------------------------------------------------------ // GeoLocation is used with GeoAdd to add geospatial location. @@ -924,9 +1573,9 @@ func (cmd *GeoLocationCmd) String() string { return cmdString(cmd, cmd.locations) } -func (cmd *GeoLocationCmd) readReply(cn *pool.Conn) error { +func (cmd *GeoLocationCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(newGeoLocationSliceParser(cmd.q)) + v, cmd.err = rd.ReadArrayReply(newGeoLocationSliceParser(cmd.q)) if cmd.err != nil { return cmd.err } @@ -934,6 +1583,73 @@ func (cmd *GeoLocationCmd) readReply(cn *pool.Conn) error { return nil } +func newGeoLocationParser(q *GeoRadiusQuery) proto.MultiBulkParse { + return func(rd *proto.Reader, n int64) (interface{}, error) { + var loc GeoLocation + var err error + + loc.Name, err = rd.ReadString() + if err != nil { + return nil, err + } + if q.WithDist { + loc.Dist, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + } + if q.WithGeoHash { + loc.GeoHash, err = rd.ReadIntReply() + if err != nil { + return nil, err + } + } + if q.WithCoord { + n, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if n != 2 { + return nil, fmt.Errorf("got %d coordinates, expected 2", n) + } + + loc.Longitude, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + loc.Latitude, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + } + + return &loc, nil + } +} + +func newGeoLocationSliceParser(q *GeoRadiusQuery) proto.MultiBulkParse { + return func(rd *proto.Reader, n int64) (interface{}, error) { + locs := make([]GeoLocation, 0, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadReply(newGeoLocationParser(q)) + if err != nil { + return nil, err + } + switch vv := v.(type) { + case string: + locs = append(locs, GeoLocation{ + Name: vv, + }) + case *GeoLocation: + locs = append(locs, *vv) + default: + return nil, fmt.Errorf("got %T, expected string or *GeoLocation", v) + } + } + return locs, nil + } +} + //------------------------------------------------------------------------------ type GeoPos struct { @@ -966,9 +1682,9 @@ func (cmd *GeoPosCmd) String() string { return cmdString(cmd, cmd.positions) } -func (cmd *GeoPosCmd) readReply(cn *pool.Conn) error { +func (cmd *GeoPosCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(geoPosSliceParser) + v, cmd.err = rd.ReadArrayReply(geoPosSliceParser) if cmd.err != nil { return cmd.err } @@ -976,6 +1692,44 @@ func (cmd *GeoPosCmd) readReply(cn *pool.Conn) error { return nil } +func geoPosSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + positions := make([]*GeoPos, 0, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadReply(geoPosParser) + if err != nil { + if err == Nil { + positions = append(positions, nil) + continue + } + return nil, err + } + switch v := v.(type) { + case *GeoPos: + positions = append(positions, v) + default: + return nil, fmt.Errorf("got %T, expected *GeoPos", v) + } + } + return positions, nil +} + +func geoPosParser(rd *proto.Reader, n int64) (interface{}, error) { + var pos GeoPos + var err error + + pos.Longitude, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + + pos.Latitude, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + + return &pos, nil +} + //------------------------------------------------------------------------------ type CommandInfo struct { @@ -1014,9 +1768,9 @@ func (cmd *CommandsInfoCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error { +func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(commandInfoSliceParser) + v, cmd.err = rd.ReadArrayReply(commandInfoSliceParser) if cmd.err != nil { return cmd.err } @@ -1024,6 +1778,74 @@ func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func commandInfoSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]*CommandInfo, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadReply(commandInfoParser) + if err != nil { + return nil, err + } + vv := v.(*CommandInfo) + m[vv.Name] = vv + + } + return m, nil +} + +func commandInfoParser(rd *proto.Reader, n int64) (interface{}, error) { + var cmd CommandInfo + var err error + + if n != 6 { + return nil, fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6", n) + } + + cmd.Name, err = rd.ReadString() + if err != nil { + return nil, err + } + + arity, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + cmd.Arity = int8(arity) + + flags, err := rd.ReadReply(stringSliceParser) + if err != nil { + return nil, err + } + cmd.Flags = flags.([]string) + + firstKeyPos, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + cmd.FirstKeyPos = int8(firstKeyPos) + + lastKeyPos, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + cmd.LastKeyPos = int8(lastKeyPos) + + stepCount, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + cmd.StepCount = int8(stepCount) + + for _, flag := range cmd.Flags { + if flag == "readonly" { + cmd.ReadOnly = true + break + } + } + + return &cmd, nil +} + //------------------------------------------------------------------------------ type cmdsInfoCache struct { diff --git a/vendor/github.com/go-redis/redis/commands.go b/vendor/github.com/go-redis/redis/commands.go index c6a88154e..b259e3a8c 100644 --- a/vendor/github.com/go-redis/redis/commands.go +++ b/vendor/github.com/go-redis/redis/commands.go @@ -62,6 +62,7 @@ type Cmdable interface { TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) TxPipeline() Pipeliner + Command() *CommandsInfoCmd ClientGetName() *StringCmd Echo(message interface{}) *StringCmd Ping() *StatusCmd @@ -171,6 +172,26 @@ type Cmdable interface { SRem(key string, members ...interface{}) *IntCmd SUnion(keys ...string) *StringSliceCmd SUnionStore(destination string, keys ...string) *IntCmd + XAdd(a *XAddArgs) *StringCmd + XLen(stream string) *IntCmd + XRange(stream, start, stop string) *XMessageSliceCmd + XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd + XRevRange(stream string, start, stop string) *XMessageSliceCmd + XRevRangeN(stream string, start, stop string, count int64) *XMessageSliceCmd + XRead(a *XReadArgs) *XStreamSliceCmd + XReadStreams(streams ...string) *XStreamSliceCmd + XGroupCreate(stream, group, start string) *StatusCmd + XGroupSetID(stream, group, start string) *StatusCmd + XGroupDestroy(stream, group string) *IntCmd + XGroupDelConsumer(stream, group, consumer string) *IntCmd + XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd + XAck(stream, group string, ids ...string) *IntCmd + XPending(stream, group string) *XPendingCmd + XPendingExt(a *XPendingExtArgs) *XPendingExtCmd + XClaim(a *XClaimArgs) *XMessageSliceCmd + XClaimJustID(a *XClaimArgs) *StringSliceCmd + XTrim(key string, maxLen int64) *IntCmd + XTrimApprox(key string, maxLen int64) *IntCmd ZAdd(key string, members ...Z) *IntCmd ZAddNX(key string, members ...Z) *IntCmd ZAddXX(key string, members ...Z) *IntCmd @@ -209,6 +230,7 @@ type Cmdable interface { BgRewriteAOF() *StatusCmd BgSave() *StatusCmd ClientKill(ipPort string) *StatusCmd + ClientKillByFilter(keys ...string) *IntCmd ClientList() *StringCmd ClientPause(dur time.Duration) *BoolCmd ConfigGet(parameter string) *SliceCmd @@ -265,9 +287,9 @@ type Cmdable interface { GeoRadiusByMemberRO(key, member string, query *GeoRadiusQuery) *GeoLocationCmd GeoDist(key string, member1, member2, unit string) *FloatCmd GeoHash(key string, members ...string) *StringSliceCmd - Command() *CommandsInfoCmd ReadOnly() *StatusCmd ReadWrite() *StatusCmd + MemoryUsage(key string, samples ...int) *IntCmd } type StatefulCmdable interface { @@ -345,6 +367,12 @@ func (c *statefulCmdable) SwapDB(index1, index2 int) *StatusCmd { //------------------------------------------------------------------------------ +func (c *cmdable) Command() *CommandsInfoCmd { + cmd := NewCommandsInfoCmd("command") + c.process(cmd) + return cmd +} + func (c *cmdable) Del(keys ...string) *IntCmd { args := make([]interface{}, 1+len(keys)) args[0] = "del" @@ -411,7 +439,7 @@ func (c *cmdable) Migrate(host, port, key string, db int64, timeout time.Duratio db, formatMs(timeout), ) - cmd.setReadTimeout(readTimeout(timeout)) + cmd.setReadTimeout(timeout) c.process(cmd) return cmd } @@ -985,7 +1013,7 @@ func (c *cmdable) BLPop(timeout time.Duration, keys ...string) *StringSliceCmd { } args[len(args)-1] = formatSec(timeout) cmd := NewStringSliceCmd(args...) - cmd.setReadTimeout(readTimeout(timeout)) + cmd.setReadTimeout(timeout) c.process(cmd) return cmd } @@ -998,7 +1026,7 @@ func (c *cmdable) BRPop(timeout time.Duration, keys ...string) *StringSliceCmd { } args[len(keys)+1] = formatSec(timeout) cmd := NewStringSliceCmd(args...) - cmd.setReadTimeout(readTimeout(timeout)) + cmd.setReadTimeout(timeout) c.process(cmd) return cmd } @@ -1010,7 +1038,7 @@ func (c *cmdable) BRPopLPush(source, destination string, timeout time.Duration) destination, formatSec(timeout), ) - cmd.setReadTimeout(readTimeout(timeout)) + cmd.setReadTimeout(timeout) c.process(cmd) return cmd } @@ -1282,6 +1310,239 @@ func (c *cmdable) SUnionStore(destination string, keys ...string) *IntCmd { //------------------------------------------------------------------------------ +type XAddArgs struct { + Stream string + MaxLen int64 // MAXLEN N + MaxLenApprox int64 // MAXLEN ~ N + ID string + Values map[string]interface{} +} + +func (c *cmdable) XAdd(a *XAddArgs) *StringCmd { + args := make([]interface{}, 0, 6+len(a.Values)*2) + args = append(args, "xadd") + args = append(args, a.Stream) + if a.MaxLen > 0 { + args = append(args, "maxlen", a.MaxLen) + } else if a.MaxLenApprox > 0 { + args = append(args, "maxlen", "~", a.MaxLenApprox) + } + if a.ID != "" { + args = append(args, a.ID) + } else { + args = append(args, "*") + } + for k, v := range a.Values { + args = append(args, k) + args = append(args, v) + } + + cmd := NewStringCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) XLen(stream string) *IntCmd { + cmd := NewIntCmd("xlen", stream) + c.process(cmd) + return cmd +} + +func (c *cmdable) XRange(stream, start, stop string) *XMessageSliceCmd { + cmd := NewXMessageSliceCmd("xrange", stream, start, stop) + c.process(cmd) + return cmd +} + +func (c *cmdable) XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd { + cmd := NewXMessageSliceCmd("xrange", stream, start, stop, "count", count) + c.process(cmd) + return cmd +} + +func (c *cmdable) XRevRange(stream, start, stop string) *XMessageSliceCmd { + cmd := NewXMessageSliceCmd("xrevrange", stream, start, stop) + c.process(cmd) + return cmd +} + +func (c *cmdable) XRevRangeN(stream, start, stop string, count int64) *XMessageSliceCmd { + cmd := NewXMessageSliceCmd("xrevrange", stream, start, stop, "count", count) + c.process(cmd) + return cmd +} + +type XReadArgs struct { + Streams []string + Count int64 + Block time.Duration +} + +func (c *cmdable) XRead(a *XReadArgs) *XStreamSliceCmd { + args := make([]interface{}, 0, 5+len(a.Streams)) + args = append(args, "xread") + if a.Count > 0 { + args = append(args, "count") + args = append(args, a.Count) + } + if a.Block >= 0 { + args = append(args, "block") + args = append(args, int64(a.Block/time.Millisecond)) + } + args = append(args, "streams") + for _, s := range a.Streams { + args = append(args, s) + } + + cmd := NewXStreamSliceCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) XReadStreams(streams ...string) *XStreamSliceCmd { + return c.XRead(&XReadArgs{ + Streams: streams, + Block: -1, + }) +} + +func (c *cmdable) XGroupCreate(stream, group, start string) *StatusCmd { + cmd := NewStatusCmd("xgroup", "create", stream, group, start) + c.process(cmd) + return cmd +} + +func (c *cmdable) XGroupSetID(stream, group, start string) *StatusCmd { + cmd := NewStatusCmd("xgroup", "setid", stream, group, start) + c.process(cmd) + return cmd +} + +func (c *cmdable) XGroupDestroy(stream, group string) *IntCmd { + cmd := NewIntCmd("xgroup", "destroy", stream, group) + c.process(cmd) + return cmd +} + +func (c *cmdable) XGroupDelConsumer(stream, group, consumer string) *IntCmd { + cmd := NewIntCmd("xgroup", "delconsumer", stream, group, consumer) + c.process(cmd) + return cmd +} + +type XReadGroupArgs struct { + Group string + Consumer string + Streams []string + Count int64 + Block time.Duration +} + +func (c *cmdable) XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd { + args := make([]interface{}, 0, 8+len(a.Streams)) + args = append(args, "xreadgroup", "group", a.Group, a.Consumer) + if a.Count > 0 { + args = append(args, "count", a.Count) + } + if a.Block >= 0 { + args = append(args, "block", int64(a.Block/time.Millisecond)) + } + args = append(args, "streams") + for _, s := range a.Streams { + args = append(args, s) + } + + cmd := NewXStreamSliceCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) XAck(stream, group string, ids ...string) *IntCmd { + args := []interface{}{"xack", stream, group} + for _, id := range ids { + args = append(args, id) + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) XPending(stream, group string) *XPendingCmd { + cmd := NewXPendingCmd("xpending", stream, group) + c.process(cmd) + return cmd +} + +type XPendingExtArgs struct { + Stream string + Group string + Start string + End string + Count int64 + Consumer string +} + +func (c *cmdable) XPendingExt(a *XPendingExtArgs) *XPendingExtCmd { + args := make([]interface{}, 0, 7) + args = append(args, "xpending", a.Stream, a.Group, a.Start, a.End, a.Count) + if a.Consumer != "" { + args = append(args, a.Consumer) + } + cmd := NewXPendingExtCmd(args...) + c.process(cmd) + return cmd +} + +type XClaimArgs struct { + Stream string + Group string + Consumer string + MinIdle time.Duration + Messages []string +} + +func (c *cmdable) XClaim(a *XClaimArgs) *XMessageSliceCmd { + args := xClaimArgs(a) + cmd := NewXMessageSliceCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) XClaimJustID(a *XClaimArgs) *StringSliceCmd { + args := xClaimArgs(a) + args = append(args, "justid") + cmd := NewStringSliceCmd(args...) + c.process(cmd) + return cmd +} + +func xClaimArgs(a *XClaimArgs) []interface{} { + args := make([]interface{}, 0, 4+len(a.Messages)) + args = append(args, + "xclaim", + a.Stream, + a.Group, a.Consumer, + int64(a.MinIdle/time.Millisecond)) + for _, id := range a.Messages { + args = append(args, id) + } + return args +} + +func (c *cmdable) XTrim(key string, maxLen int64) *IntCmd { + cmd := NewIntCmd("xtrim", key, "maxlen", maxLen) + c.process(cmd) + return cmd +} + +func (c *cmdable) XTrimApprox(key string, maxLen int64) *IntCmd { + cmd := NewIntCmd("xtrim", key, "maxlen", "~", maxLen) + c.process(cmd) + return cmd +} + +//------------------------------------------------------------------------------ + // Z represents sorted set member. type Z struct { Score float64 @@ -1682,6 +1943,20 @@ func (c *cmdable) ClientKill(ipPort string) *StatusCmd { return cmd } +// ClientKillByFilter is new style synx, while the ClientKill is old +// CLIENT KILL